Create a Data-Driven Microservice with Database
- To create a microservice consisted of two data processes.
- To create a database configuration and read/write the database in logic.
LOC data processes - after linked to at least one API routes - can serve as RESTful-like microservices. In this tutorial, we'll see how to create a simple logging microservice which can read and write a database table.
The "microservice" we'll create should be able to do the following things:
- Accepts a
GET
request to query and return logs from database - Accepts a
POST
request with JSON payload to add logs into database - The two actions share the same HTTP endpoint, for example,
/api/db-tutorial/v1/logs
.
In this post we'll use a Microsoft SQL Server database deployed on Amazon AWS. The SQL statements in this tutorial are hence designed for SQL Server only and need to be modified for other types of databases.
Currently LOC supports the following types of databases:
- MySQL
- PostgreSQL
- MS SQL Server
- Oracle Database
Database Table Schema
For this tutorial, we'll create a table Log
with a simple schema as below:
Field | Type |
---|---|
ID | INT IDENTITY NOT NULL |
Message | Text |
Timestamp | DATETIME |
The
ID
field is defined as auto increment (IDENTITY
; equivalent toAUTO_INCREMENT
in other SQL variations).
MS-SQL Statements for Create and Drop Table
Be noted that the statement would differ on different databases. The name dbo
and and command GO
are required for MS SQL Server.
Create table
CREATE TABLE dbo.Log (
ID INT IDENTITY NOT NULL,
Message TEXT,
Timestamp DATETIME
);
GO
Delete all rows in table
Does not reset the auto increment number of
ID
DELETE FROM dbo.Log;
GO
Drop (remove) dable
DROP TABLE dbo.Log;
GO
Create a Database Agent Configuration
As the way we've setup the HTTP agent configuration in another tutorial, go to Administration/Agent Configuration and add a database configuration under one of the folders:
Since our MS SQL Server is for testing purpose only, the Trust Certificate
is set to True
.
RESTful Actions, API Routes and Data Processes
For the sake of demonstration, our "microservice" supports only two actions:
Action | HTTP Method/API Route | Data Process |
---|---|---|
Insert logs | POST /api/db-tutorial/v1/logs | log-service-post |
Query and return latest N (default 1000) logs | GET /api/db-tutorial/v1/logs?limit=n | log-service-get |
Each action is mapped to its own API route and data process. We'll take a look of these two data processes and their logic.
log-service-post
: POST Log Messages
Input Data
The service accepts a payload which may include one or more log messages:
[
{
"Message": "Life, Universe and Everything"
},
{
"Message": "42"
},
{
"Message": "Don't Panic"
}
]
Since ID
is auto increment and Timestamp
is generated at the time when the service process the messages, we only need to provide messages.
In order to keep the demonstration simple, the POST service will not return anything except the "OK"
or "Error"
status.
SQL Statement
The insert SQL script as prepared statement would be like this:
INSERT INTO dbo.Log
(Message, Timestamp)
VALUES
(@P1, @P2),
(@P3, @P4),
(@P5, @P6),
...;
GO
The @P1
, @P2
... are placeholders for MS SQL Server. In MySQL it would be ?
and in PostgreSQL it would be $1
, $2
...
Each set of values is correspond to the message and the timestamp (ISO 8601 string).
Logic Design
Logic | Name | Purpose | DB Config Ref |
---|---|---|---|
Generic logic #1 | payload-json-parser (source) | Parse payload to JSON object | |
Generic logic #2 | log-service-post-logic | Read parsed payload and generate SQL statements/parameters | |
Generic logic #3 | database-query (source) | Query database | comx |
Aggregator logic | log-service-aggregator | Finalise service result |
Code for two of the logic (generic #1 and #3) can be found in the Logic Module Library (the database query logic handles both select and action queries).
The database-query
accepts the SQL statements and parameters from another logic, in this case namely log-service-post-logic
.
Aanother two built specifically for the service:
- Generic logic #1
- Generic logic #2
- Generic logic #3
- Aggregator logic
import { LoggingAgent, SessionStorageAgent } from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read payload
const payload = await ctx.payload();
let data = null;
if ("http" in payload) {
data = payload.http.request.data;
} else if ("messageQueue" in payload) {
data = payload.messageQueue.data;
} else {
throw new Error("this logic only accepts http/mq payload");
}
let parsed = null;
try {
// decode body from Uint8Array to string and convert to JSON
parsed = JSON.parse(new TextDecoder().decode(new Uint8Array(data)));
} catch (e) {
LoggingAgent.warn({
error: true,
errorMessage: `unable to parse JSON payload: ${e.message}`,
stack: e.stack,
taskKey: ctx.task.taskKey,
});
}
// log parsed JSON
LoggingAgent.info({ parsed: parsed });
// write the parsed data into session storage
await SessionStorageAgent.putJson("parsed", parsed);
}
export async function handleError(ctx, error) {
// error logging
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskKey: ctx.task.taskKey,
});
}
import { LoggingAgent, SessionStorageAgent } from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read parsed JSON payload
const parsed = await SessionStorageAgent.get("parsed");
// log payload
LoggingAgent.info({
parsed: parsed,
});
if (!Array.isArray(parsed)) throw new Error("payload has to be an array");
// generate sql object for select/insert query
let values = [];
let params = [];
let params_id = 1;
// generating SQL value placeholders and parameter array
parsed.forEach((msg) => {
if ("Message" in msg || "message" in msg) {
values.push(`((@P${params_id}), (@P${params_id + 1}))`);
const new_msg = msg.Message ? msg.Message : msg?.message;
const new_dt = new Date().toISOString();
params.push(new_msg);
params.push(new_dt);
params_id += 2;
}
});
if (!values) throw new Error("no messages to insert");
const sql = {
configName: "comx",
statement: `INSERT INTO dbo.Log (Message, Timestamp) VALUES ${values.join(
", ",
)};`,
params: params,
};
// log sql
LoggingAgent.info({
sql: sql,
});
// write sql into session storage
await SessionStorageAgent.putJson("sql", sql);
}
export async function handleError(ctx, error) {
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskId: ctx.task.taskKey,
});
}
import {
DatabaseAgent,
LoggingAgent,
SessionStorageAgent,
} from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read sql object from session
const sql = await SessionStorageAgent.get("sql");
/* or comment out the line above and replace sql object with the following declaration:
const sql = {
configName: "db config name",
statement: "select * from table where col_1 = ? and col_2 = ?;",
params: ["value_1", "value_2"],
}
*/
// skip logic if sql session variable does not exist (is null)
if (!sql) return;
// throws an error if fields in sql are missing
if (
!(typeof sql == "object") ||
!("configName" in sql) ||
!("statement" in sql) ||
!("params" in sql)
)
throw new Error("sql is not an object or has missing fields!");
// log sql object
LoggingAgent.info({ sql: sql });
let dbClient = null;
let resp = null;
let db_query_status = "error";
let db_error = null;
try {
// aquire database client
dbClient = await DatabaseAgent.acquire(sql.configName);
LoggingAgent.info("database client acquired");
LoggingAgent.info({
dataSourceId: dbClient.uid.dataSourceId,
connectionId: dbClient.uid.connectionId,
});
// run select or action query
if (sql.statement.toLowerCase().includes("select")) {
resp = await dbClient?.query(sql.statement, sql.params);
} else {
await dbClient?.execute(sql.statement, sql.params);
}
db_query_status = "ok";
} catch (e) {
db_error = {
error: true,
errorMessage: `database query error: ${e.message}`,
stack: e.stack,
taskKey: ctx.task.taskKey,
};
LoggingAgent.error(db_error);
} finally {
// release database connection
await dbClient?.release();
}
// log query result and db action status
LoggingAgent.info({ db_resp: resp });
LoggingAgent.info(`db_action_status: ${db_action_status}`);
// write query result, query status and error into session storage
await SessionStorageAgent.putJson("db_resp", resp);
await SessionStorageAgent.putString("db_query_status", db_query_status);
await SessionStorageAgent.putJson("db_error", db_error);
}
export async function handleError(ctx, error) {
// error logging
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskKey: ctx.task.taskKey,
});
}
import {
LoggingAgent,
ResultAgent,
SessionStorageAgent,
} from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read database query status
const db_query_status = await SessionStorageAgent.get("db_query_status");
// read database response
const db_resp = await SessionStorageAgent.get("db_resp");
// read database error
const db_error = await SessionStorageAgent.get("db_error");
const result = {
status: db_query_status,
taskId: ctx.task.taskKey,
data: db_resp?.rows || [],
};
if (db_error) result.error = db_error;
// finalise result
ResultAgent.finalize(result);
}
export async function handleError(ctx, error) {
const err = {
error: true,
errorMessage: error.message,
stack: error.stack,
taskKey: ctx.task.taskKey,
};
// error logging
LoggingAgent.error(err);
// finalise result
ResultAgent.finalize(err);
}
log-service-get
: GET Log Messages
Output Data
The GET service should return a task result like this:
{
"status": "ok",
"taskId": {
"executionId": "...",
"taskId": "..."
},
"data": [
{
"ID": 3,
"Message": "Don't Panic",
"Timestamp": "2023-05-24T04:07:36.686666666Z"
},
{
"ID": 2,
"Message": "42",
"Timestamp": "2023-05-24T04:07:36.686666666Z"
},
{
"ID": 1,
"Message": "Life, Universe and Everything",
"Timestamp": "2023-05-24T04:07:36.686666666Z"
}
]
}
SQL Statement
The select SQL script as prepared statement would be like this:
SELECT TOP (@P1) *
FROM dbo.Log
ORDER BY
Timestamp DESC,
ID DESC;
GO
The
TOP
clause is equivalent toLIMIT
at the end of other SQL variations.
The @P1
parameter will be the QueryString parameter limit
from the GET request. If not provided, the service will use the default value of 1000
.
Logic Design
Logic | Name | Purpose | DB Config Ref |
---|---|---|---|
Generic logic #1 | querystring-parser (source) | Parse QueryString to object | |
Generic logic #2 | log-service-get-logic | Read parsed QueryString and generate SQL statements/parameters | |
Generic logic #3 | database-query (source) | Query database and return results | comx |
Aggregator logic | log-service-aggregator (see log-service-post ) | Finalise service result |
Like the first data process, code for two of the logic (generic #1 and #3) can be found in the Logic Module Library. This means the two data processes actually share 50% of the code with reusable logic modules.
Other than using a QueryString parser this time, the second logic is different as well:
- Generic logic #1
- Generic logic #2
import {
LoggingAgent,
SessionStorageAgent
} from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read payload
const payload = await ctx.payload();
if (!("http" in payload)) throw new Error("this logic only accepts http request");
// get querystring (skip if empty)
const query = payload.http.request.query;
if (!query) return;
let params = null;
try {
// parse querystring to object
params = Object.fromEntries(new URLSearchParams(query));
} catch (e) {
LoggingAgent.warn({
error: true,
errorMessage: `unable to parse QueryString parameters: ${e.message}`
stack: e.stack,
taskKey: ctx.task.taskKey,
});
}
// log parsed querystring
LoggingAgent.info({ params: params });
// write the parsed data into session storage
await SessionStorageAgent.putJson("params", params);
}
export async function handleError(ctx, error) {
// error logging
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskKey: ctx.task.taskKey,
});
}
import { LoggingAgent, SessionStorageAgent } from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read parsed querystring
const params = await SessionStorageAgent.get("params");
// log params
LoggingAgent.info({
params: params,
});
// get limit
let limit = 1000; // default
if ("limit" in params) {
try {
limit = Number(params.limit);
} catch (e) {
// skip
}
}
// generate sql object for select/insert query
const sql = {
configName: "comx",
statement:
"SELECT TOP (@P1) * FROM dbo.Log ORDER BY Timestamp DESC, ID DESC;",
params: [limit], // query 1000 rows by default
};
// log sql
LoggingAgent.info({
sql: sql,
});
// write sql into session storage
await SessionStorageAgent.putJson("sql", sql);
}
export async function handleError(ctx, error) {
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskId: ctx.task.taskKey,
});
}
Adding Database Configuration Reference
While adding the database-query
logic to both data processes, you should add a database confiuration reference name - in our example named as comx
:
API Routes
Finally create API routes for both data processes:
HTTP Method | API Route URL | Request Mode | Response Content Type | Linked Data Process |
---|---|---|---|---|
POST | /api/db-tutorial/v1/logs | Sync | JSON | log-service-post |
GET | /api/db-tutorial/v1/logs | Sync | AUTO | log-service-get |
You can see that both API routes share the same path but accepts different HTTP methods, so that they will invoke different data processes that implements corresponding service logic.
Invoke the Service
After everything is in place, you would be able to invoke the service with either GET or POST requests:
You can also test the data processes with Single Data Process Execution.
Further Development
Adding More RESTful Actions
Although we only implemented two actions here, more can be applied with the same principle - for example, PATCH
(update log messages) and DELETE
(delete log messages) - with more API routes and data processes added.
Example payload and logic for UPDATE and DELETE services
Here we'll give you another two example code for PATCH
and DELETE
. Both data processes would share the same logic of the POST
service except the second logic:
- PATCH (update)
- DELETE
API Route
HTTP Method | API Route URL | Request Mode | Response Content Type | Linked Data Process |
---|---|---|---|---|
PATCH | /api/db-tutorial/v1/logs | Sync | JSON | log-service-patch |
JSON payload
[
{
"id": "id of message 1",
"message": "new message 1"
},
{
"id": "id of message 2",
"message": "new message 2"
},
{
"id": "id of message 3",
"message": "new message 3"
}
]
SQL statement
UPDATE dbo.Log
SET
message = (
CASE id
WHEN @P1 THEN @P2
WHEN @P3 THEN @P4
...
ELSE message
END
);
GO
Logic
import { LoggingAgent, SessionStorageAgent } from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read parsed JSON payload
const parsed = await SessionStorageAgent.get("parsed");
// log payload
LoggingAgent.info({
parsed: parsed,
});
if (!Array.isArray(parsed)) throw new Error("payload has to be an array");
// generate sql object for select/insert query
let cases = [];
let params = [];
let params_id = 1;
// generating SQL value placeholders and parameter array
parsed.forEach((update) => {
if (
("id" in update || "ID" in update) &&
("Message" in update || "message" in update)
) {
const id = update.id ? update.id : update?.ID;
const new_msg = update.Message ? update.Message : update?.message;
cases.push(`WHEN @P${params_id} THEN @P${params_id + 1}`);
params.push(id);
params.push(new_msg);
params_id += 2;
}
});
if (!cases) throw new Error("no messages to be updated");
const sql = {
configName: "comx",
statement: `UPDATE dbo.Log SET message = (CASE id ${cases.join(" ")} ELSE message END);`,
params: params,
};
// log sql
LoggingAgent.info({
sql: sql,
});
// write sql into session storage
await SessionStorageAgent.putJson("sql", sql);
}
export async function handleError(ctx, error) {
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskId: ctx.task.taskKey,
});
}
API Route
HTTP Method | API Route URL | Request Mode | Response Content Type | Linked Data Process |
---|---|---|---|---|
DELETE | /api/db-tutorial/v1/logs | Sync | JSON | log-service-delete |
JSON payload
[
{
"id": "id of message 1"
},
{
"id": "id of message 2"
},
{
"id": "id of message 3"
}
]
SQL statement
DELETE FROM dbo.Log
WHERE
id IN (@P1, @P2, @P3...);
GO
Logic
import { LoggingAgent, SessionStorageAgent } from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
// read parsed JSON payload
const parsed = await SessionStorageAgent.get("parsed");
// log payload
LoggingAgent.info({
parsed: parsed,
});
if (!Array.isArray(parsed)) throw new Error("payload has to be an array");
// generate sql object for select/insert query
let values = [];
let params = [];
let params_id = 1;
// generating SQL value placeholders and parameter array
parsed.forEach((IDs) => {
if ("id" in IDs || "ID" in IDs) {
values.push(`@P${params_id++}`);
const new_id = IDs.id ? IDs.id : IDs?.ID;
params.push(new_id);
}
});
if (!values) throw new Error("no ids to be deleted");
const sql = {
configName: "comx",
statement: `DELETE FROM dbo.Log WHERE id IN (${values.join(", ")});`,
params: params,
};
// log sql
LoggingAgent.info({
sql: sql,
});
// write sql into session storage
await SessionStorageAgent.putJson("sql", sql);
}
export async function handleError(ctx, error) {
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskId: ctx.task.taskKey,
});
}
Recreate RESTful Responses
The execution result we had above includes the execution metadata, however you can omit these information by turning the API routes' encapsulation off.
For example, we can simply only return the queried database result:
const messages = db_resp?.rows || [];
ResultAgent.finalize(messages);
And the API route would return
[
{
"ID": 3,
"Message": "Don't Panic",
"Timestamp": "2023-05-24T04:07:36.686666666Z"
},
{
"ID": 2,
"Message": "42",
"Timestamp": "2023-05-24T04:07:36.686666666Z"
},
{
"ID": 1,
"Message": "Life, Universe and Everything",
"Timestamp": "2023-05-24T04:07:36.686666666Z"
}
]
When the API route's encapsulation is set to false
, only the task results - or the object defined by ResultAgent.finalize()
- would be returned. Which means you can have full control of the services response. This may come in handy if you wish to integrate LOC data processes with your legacy applications or other services.
One limitation is that LOC tasks cannot return custom HTTP status code, for example, using 403
or 500
to indicate something is wrong in the service. The HTTP code will always be 200
(OK) as long as the execution itself completed successfully.
This should be taken into consideration if your legacy systems relied on making decisions by the response HTTP code.