Skip to main content
Version: LOC v0.9 (legacy)

Events and Multiple Data Processes

Learning Objective
  • To query events emitted from another data process.
  • To inspect the larger data lineage consist of multiple types of events.

This is the follow-up of Emit and Inspect Events, in which we've learned the basics of emitting events.

In reality, your company may have much longer business processes, and they may lay across several different departments. LOC events play an important role of communicate between data processes, as well as to track how data flows.

For example, now that we have some policy applications coming in, we need to assign them to insurance agents. We will, of course, build a second data process to achieve this.

info

How to trigger the second data process depends on your use case, but generally you can

  1. Have the second data process run on scheduler and check events periodically;
  2. Have the first data process invoke a API route or message queue trigger;
  3. Manually started by human operators.

In this example we will execute it manually.

Query Events

Use Case: Assigning Insurance Agent

This new data process will query the EventPolicyRequest events emitted in the previous tutorial, extract the request ID and policy code, then assign to the agent who is responsible for a specific type of policy:

We will use a mock-up agent mapping table (instead of querying it from some databases):

Agent nameResponsible for
MarvinP100, P200
TrillianP300, P400, P500

So if the requested insurance policy is type P100, it will be assigned to Marvin, and so on.

Code Walkthrough

This time we'll create the second data process policy-assign:

LogicNamePurpose
Generic logic #1policy-queryQuery EventPolicyRequest events and write into session
Generic logic #2policy-assignRead from session and emit EventAssignAgent events
Aggregator logicresult-aggregator (source)Finalise task result

We don't need payload-json-parser this time since this data process does not require trigger payload. Instead, policy-query will query the events we are looking for and write them into the parsed session variable.

policy-query
import {
LoggingAgent,
SessionStorageAgent,
EventAgent,
} from "@fstnetwork/loc-logic-sdk";

export async function run(ctx) {
// event query conditions
const requests = {
queries: [
{
field: "label_name", // field name
type: "match", // matching operator
value: "EventPolicyRequest", // value (event name)
},
],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [],
};

// query events
const query = await EventAgent.search(requests);
const events = query?.events;

if (events) {
// log queried events
LoggingAgent.info({ queried_events: query?.events });

// overwrite parsed with events if not empty
await SessionStorageAgent.putJson("parsed", query?.events);
}
}

export async function handleError(ctx, error) {
// error logging
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskId: ctx.task.taskKey,
});
}
Filter events within a timespan

By the example above the event store agent would query as many as 1,000 events. It would be sensible to do some double-checking so that you won't process the same event twice. You can also filter events within s certain timespan:

...
const requests = {
queries: [
{
field: "label_name",
type: "match",
value: "EventPolicyRequest"
}
],
excludes: [],
filters: [ // add a filter condition
{
field: "timestamp",
gte: Date.now() - 10 * 60 * 1000, // greater than or equal
lte: Date.now(), // less than or equal
type: "range",
}
],
from: 0,
size: 1000,
sorts: [],
};

...

Both gte and lte fields receive a number (unix timestamp).

The query parameters above would try to find events with label name EventPolicyRequest emitted within 10 minutes.

Invoke Data Process

note

Since this data process does not process trigger payload, you can leave the payload in single data process execution or body in HTTP request (while using an API route) empty.

After executing the data process, tt should return a similar result as the first policy allication data process:

{
"status": "ok",
"taskId": {
"id": "Vfe81Qml67opRaBfndhUBw",
"executionId": "ZCKROD8ZFQi9d03qxp2n8Q"
},
"response": {
"emitted_length": 3,
"events": [
{
"labelName": "EventAssignAgent",
"sourceDID": "R0042",
"targetDID": "Trillian",
"meta": "P100",
"type": "default"
},
{
"labelName": "EventAssignAgent",
"sourceDID": "R0043",
"targetDID": "Marvin",
"meta": "P300",
"type": "default"
},
{
"labelName": "EventAssignAgent",
"sourceDID": "R0044",
"targetDID": "Trillian",
"meta": "P500",
"type": "default"
}
]
}
}

Inspect Events

Finally go back to Studio's Data Discovery/Event to find the policy events (in order to see multiple events, we use a time range filter instead):

events3
note

For now Studio does not support multiple search conditions, so we'll simply ignore unrelated events here. However, you can consider to add a common prefix to search a specific group of events easier.

We can also switch to see the data lineage graph of total six events:

datalineage2
note

You'll need to drag the nodes around to re-arrange the graph as above.

This clearly show the data flow of between the two data processes: the applicant submited a request for a certain type of insurance policy, and each request got assigned to one agent. This can go on across the whole business process and be used to find out incorrect business mappings or missing links.