Emit and Inspect Events
- To understand what are events in LOC.
- To emit events in logic.
- To inspect the emitted events as data lineage graph.
LOC events allows users to generate metadata represent a data flow or data trail to indicate who have sent, used or received data. These information can be very useful for two reasons:
- Active medatada management
- Data collaboration
- Data auditing
Here we will have a look at a simple example that use events and then inspect them in Studio.
What are Events?
In LOC, events are user-defined metadata generated by using Event Store Agent. Both event source and target are user-defined nodes that will be created in Elasticsearch.
Events can be used to exchange information between data processes, or simply as a form of active metadata alongside existing database or file transactions, so that you can have a clear idea when and where did the data flow.
The graphical, network-like representation of events (which may share source or target nodes) in Studio is referred as data lineage.
There are also system auto-generated events, but we will not discuss that here.
Emit Events
Use Case: Insurance Policy Application
Imagine you are working for a insurance company and need to build a new process to let customers submit applications for new policies.
We assume the company offers the following policies:
Policy Code | Type |
---|---|
P100 | Permanent life insurance |
P200 | Health Insurance |
P300 | Liability car insurance |
P400 | Personal injury protection |
P500 | Personal property coverage |
The system needs to record who have submitted an application to what policy? so that the company and the customer can track the processing progress:
We'll use a mocked request JSON payload (multiple applications) like this:
[
{
"requestID": "R0042",
"policyCode": "P100",
"applicantID": "A1979"
},
{
"requestID": "R0043",
"policyCode": "P300",
"applicantID": "A1982"
},
{
"requestID": "R0044",
"policyCode": "P500",
"applicantID": "A1982"
}
]
So the data process handling the application process should generate 3 events (where two applicants send three policy applications):
A1979 -> EventPolicyRequest (meta: P100) -> R0042
A1982 -> EventPolicyRequest (meta: P300) -> R0043
A1982 -> EventPolicyRequest (meta: P500) -> R0044
In reality, the payload may be far more complicated and the data process would have to process and store those application information. But you get the idea.
Code Walkthrough
In this example we'll create a data process Policy Application
with the following logic:
Logic | Name | Purpose |
---|---|---|
Generic logic #1 | Payload JSON Parser (source) | Read and parse JSON payload |
Generic logic #2 | Policy Application Logic | Process parsed JSON and emit events |
Aggregator logic | Result Aggregator (source) | Finalise task result |
The Policy Application
logic is as below:
import {
LoggingAgent,
SessionStorageAgent,
EventAgent,
} from "@fstnetwork/loc-logic-sdk";
export async function run(ctx) {
const parsed = await SessionStorageAgent.get("parsed");
// check if parsed is an array
if (!Array.isArray(parsed)) return;
// filter out items with necessary fields
const filteredList = parsed.filter(
(item) =>
"requestID" in item &&
"policyCode" in item &&
"applicantID" in item,
);
// exit logic if no events can be emitted
if (filteredList.length === 0) return;
// generate event objects
let events = [];
filteredList.forEach((item) => {
events.push({
labelName: "EventPolicyRequest", // label name (event name)
sourceDID: item.applicantID, // source DID
targetDID: item.requestID, // target DID
meta: item.policyCode, // meta payload
type: "default", // event type (group)
});
});
// emit events
await EventAgent.emit(events);
// prepare result
const result = {
emitted_length: events.length,
events: events,
};
// log events
LoggingAgent.info(result);
// write events to result
await SessionStorageAgent.putJson("result", result);
}
export async function handleError(ctx, error) {
// error logging
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskId: ctx.task.taskKey,
});
}
Invoke Data Process
In order to inspect the events in Studio, you'll need to deploy the data process above to LOC (not local runtime).
You can invoke it with either an API route or manual trigger (see QUick Start.)
The finalised result would be like:
{
"status": "ok",
"taskKey": {
"executionId": "ZOMhIGpLZcE8WUgxx3bohw",
"taskId": "QX32R9PH9By4OjWTo-pJvw"
},
"data": {
"emitted_length": 3,
"events": [
{
"labelName": "EventPolicyRequest",
"sourceDID": "A1979",
"targetDID": "R0042",
"meta": "P100",
"type": "default"
},
{
"labelName": "EventPolicyRequest",
"sourceDID": "A1982",
"targetDID": "R0043",
"meta": "P300",
"type": "default"
},
{
"labelName": "EventPolicyRequest",
"sourceDID": "A1982",
"targetDID": "R0044",
"meta": "P500",
"type": "default"
}
]
}
}
Inspect Events
Using Data Discovery
If you execute the data process manually, you can click Emitted Events in the task execution result window, then click the Event button:
This will bring you to the Data Discovery/Events panel and shows all events emitted from the task ID.
If you execute the data process with an actual API route trigger, you can still go there and filter events with the task ID (which will be included in the returned API response) or the event label name EventPolicyRequest
.
You can also apply a time range filter if you have emitted the events several times.
You can see the three events we've just emitted. You can expand the events to see more details (including the meta payload):
Now click Event Lineage Graph to see the graphical data lineage:
You can clearly see that this is the graphical representation of an event - applicant A1979
sent an application request R0042
, and so on. Both A1979
and R0042
have became nodes - a digital or virtual identity as a data sender or receiver.
In the next article, we will see that we can extend this data flow even further - creating a longer data trail as the application get processed.