Event Store Agent
Emit and query LOC data events.
Once emitted, events are stored and can be searched in Elasticsearch. They will be generated to data lineage graphs in Studio.
Availability
- ✓ Generic logic
- ✗ Aggregator logic
Emit Events
async eventStore.emit(events: Event.Event[]): Promise<void>
Emit event(s). The parameter events
is an array of events.
Type Event.Event
has to have the following fields:
Field | Type | Description |
---|---|---|
labelName | string | Label name (event name) |
sourceDigitalIdentity or sourceDID | string | Source digital identity (DID) |
targetDigitalIdentity or targetDID | string | Target DID |
meta | string | Meta payload (additional data); max length 32768 bytes. |
type | string | Event type (group) |
For now the type
only supports "default"
.
The data lineage or data trail is represented by the relationship of
Source and target nodes will be created in Elasticsearch if not exist. Any nodes can be both source and target of other events.
Example
const events = [
{
// event 1
labelName: "Event name 1",
sourceDID: "Event source 1",
targetDID: "Event target 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Event source 2",
targetDID: "Event target 2",
meta: "",
type: "default",
},
// ...
];
await ctx.agents.eventStore.emit(events);
The events may not be properly emitted without using await
.
You can also use JSON.stringify()
to include a JSON object in the meta payload, and later decode it with JSON.parse()
.
Query Events
Query event(s) in Elasticsearch.
async eventStore.search(request: Search): Promise<SearchResult>
Parameter request
is of type Search
(all field are optional):
Due to the complexity of search options, we won't show the detail of type SearchRequest
and SearchResponse
here. Please refer to the examples.
The returned events (an array) will be at SearchRequest.events
. Each element has the type Event
:
Member | Type | Description |
---|---|---|
dataProcessIdentityContext | IdentityContext (see Context) | Data process ID and name |
logicIdentityContext | IdentityContext | Logic identity ID and name |
executionId | string | Execution ID |
taskId | string | Task ID |
sequence | number | Event sequence number |
label | Label , which is { id: string; name: string; } | Event label ID and name |
sourceDigitalIdentity | string | Source DID |
targetDigitalIdentity | string | Target DID |
meta | string | Meta payload |
type | string | Event group |
timestamp | string | Event emitted time |
Example: query events
- JavaScript
- TypeScript
// helper function
const createMatch = (field, value) => ({ Match: { field, value } });
const requests = {
queries: [
createMatch(
// match condition 1
"label_name", // field name
"event name", // value
),
// match condition 2...
],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
// helper function
const createMatch = (field: string, value: string) => ({
Match: { field, value },
});
const requests = {
queries: [
createMatch(
// match condition 1
"label_name", // field name
"event name", // value
),
// match condition 2...
],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
You can combine conditions in queries
, excludes
, filters
and sorts
for more precise search. These field names are also required even if you don't use them.
List of available query field names
Name | Description |
---|---|
data_process_permanent_identity | Data process permanent ID |
data_process_name | Data process name |
data_process_revision | Data process revision number |
logic_name | Logic name |
logic_permanent_identity | Logic permanent ID |
logic_revision | Logic revision number |
execution_id | Execution ID |
task_id | Task ID |
sequence | Event sequence number |
label_id | label ID |
label_name | Label name (labelName in emit) |
source_digital_identity | Source DID (sourceDID in emit) |
target_digital_identity | Target DID (targetDID in emit) |
type | Type (type in emit) |
Example: exclude events
- JavaScript
- TypeScript
const createMatch = (field, value) => ({ Match: { field, value } });
const requests = {
queries: [],
excludes: [
createMatch(
// match condition 1
"source_digital_identity", // field name
"source DID", // exclude events with this source DID
),
// match condition 2...
],
filters: [],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
const createMatch = (field: string, value: string) => ({
Match: { field, value },
});
const requests = {
queries: [],
excludes: [
createMatch(
// match condition 1
"source_digital_identity", // field name
"source DID", // exclude events with this source DID
),
// match condition 2...
],
filters: [],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
Example: filter events (compare)
- JavaScript
- TypeScript
const filterCondition = (field, type, value) => ({ Range: { field, value: { [type]: value } } });
const requests = {
queries: [],
excludes: [],
filters: [
filterCondition( // filter condition 1
"target_digital_identity", // field name
"gte" // compare type
9000, // compared value
), // filter events that target DIDs >= 9000
// filter condition 2...
],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
const filterCondition = (field: string, type: string, value: number) => ({ Range: { field, value: { [type]: value } } });
const requests = {
queries: [],
excludes: [],
filters: [
filterCondition( // filter condition 1
"target_digital_identity", // field name
"gte" // compare type
9000, // compared value
), // filter events that target DIDs >= 9000
// filter condition 2...
],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
The compared value has to be number
type. The compare type can be either "gte"
(greater than or equal) or "lte"
(less than or equal).
It would be a good idea to use timestamp
field to filter out older events so you won't have to process them twice.
Example: filter events (wildcard)
filters
can apply a wildcard search as well:
- JavaScript
- TypeScript
const filterConditionWildCard = (field, value) => ({
Wildcard: { field, value },
});
const requests = {
queries: [],
excludes: [],
filters: [
filterConditionWildCard(
// filter condition 1
"target_digital_identity", // field name
"target DID", // field name to be searched as wildcard
),
// filter condition 2...
],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
const filterConditionWildCard = (field: string, value: string) => ({
Wildcard: { field, value },
});
const requests = {
queries: [],
excludes: [],
filters: [
filterConditionWildCard(
// filter condition 1
"target_digital_identity", // field name
"target DID", // field name to be searched as wildcard
),
// filter condition 2...
],
from: 0,
size: 1000,
sorts: [],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
Example: sort events
- JavaScript
- TypeScript
const requests = {
queries: [],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [
{
// sort condition 1
field: "source_digital_identity",
orderBy: "Desc",
},
// sort condition 2...
],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
const requests = {
queries: [],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [
{
// sort condition 1
field: "source_digital_identity",
orderBy: "Desc" as "Asc" | "Desc",
},
// sort condition 2...
],
};
const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;
orderBy
can be either "Asc"
(ascending order) or "Desc"
(descending order). Works for numeric or non-numeric string values.
Query Event Sequences
Search sequence of events. The first event has to satisfy first search condition, and so on...
async eventStore.searchWithPattern(request: PatternRequest): Promise<PatternResponse>
Like search
, we'll skip the detail of type PatternRequest
and PatternResponse
.
Example
- JavaScript
- TypeScript
// helper function
const sequenceCondition = (field, value, type) => {
const operators = ["Eq", "NotEq", "Gt", "Lt", "Gte", "Lte"];
if (!operators.includes(type)) type = "Eq";
return { [type]: { field, value } };
};
// create sequence search pattern
const requests = {
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [sequenceCondition("label_name", "event name 1", "Eq")],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [sequenceCondition("label_name", "event name 2", "Eq")],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
};
const query = await ctx.agents.eventStore.searchWithPattern(requests);
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
// helper function and type
interface QueryMap {
field: string;
value: string;
[k: string]: unknown;
}
type Condition =
| { Eq: QueryMap }
| { NotEq: QueryMap }
| { Gt: QueryMap }
| { Lt: QueryMap }
| { Gte: QueryMap }
| { Lte: QueryMap };
const sequenceCondition = (field: string, value: string, type: string) => {
const operators = ["Eq", "NotEq", "Gt", "Lt", "Gte", "Lte"];
if (!operators.includes(type)) type = "Eq";
return { [type]: { field, value } } as Condition;
};
// create sequence search pattern
const requests = {
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [sequenceCondition("label_name", "event name 1", "Eq")],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [sequenceCondition("label_name", "event name 2", "Eq")],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
};
const query = await ctx.agents.eventStore.searchWithPattern(requests);
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
Available field names are the same as search
type
operator options in sequenceCondition includes:
Operator | Descriptionn |
---|---|
"Eq" | equal |
"NotEq" | not equal |
"Gt" | greater than |
"Lt" | less than |
"Gte" | greater than or equal |
"Lte" | less than or equal |