Skip to main content
Version: LOC v0.9 (legacy)

Event Store Agent

import { EventAgent, Event, Search, Pattern } from "@fstnetwork/loc-logic-sdk";

Emit and query LOC data events.

Once emitted, events are stored and can be searched in Elasticsearch in a very short time. They will be generated to data lineage graphs in Studio.

The data lineage or data trail is represented by the relationship of the graph below:

Availability

  • ✓ Generic logic
  • ✗ Aggregator logic

Emit Events

async EventAgent.emit(events: Event.Event[]): Promise<void>

Emit event(s). The parameter events is an array of events.

Event Schema

Type Event.Event has the following fields:

FieldTypeDescription
labelNamestringLabel name (event name)
sourceDigitalIdentity or sourceDIDstringSource digital identity (DID)
targetDigitalIdentity or targetDIDstringTarget DID
metastringMeta payload (additional data); max length 215 (32768) bytes.
typestringEvent type (group)

The input parameter/value of a would-be event is also referred as event schema.

info

For now type only supports "default".

Elements of events does not have to be Event.Event type, but an error would be thrown if label, sourceDID or targetDID field is not present.

Source and target nodes will be created in Elasticsearch if not exist. Any nodes can be both source and target of other events.

Example

await EventAgent.emit([
{
// event 1
labelName: "Event name 1",
sourceDID: "Event source 1",
targetDID: "Event target 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Event source 2",
targetDID: "Event target 2",
meta: "",
type: "default",
},
// ...
]);
warning

The events may not be properly emitted without using await.

tip

You can also use JSON.stringify() to include a JSON object in the meta payload, and later decode it with JSON.parse().

Query Events

Query event(s) in Elasticsearch.

async EventAgent.search(request: Search): Promise<SearchResult>

Parameter request is of type Search and the function returns type SearchResult.

Search Parameter

Type: Search

MemberTypeDescription
queries?Query[] | nullEvent query conditions
excludes?Query[] | nullEvent exclude conditions
filters?Filter[] | nullEvent filter conditions
sort?Sort[] | nullEvent sort operator
aggregation?Aggregation | nullAggregation syntax
from?number | nullEvent query starts from
size?number | nullEvent query size
note
type Query =
| {
field: string;
type: "match";
value: string;
}
| {
field: string;
type: "match_phrase";
value: string;
}
| {
field: string;
type: "term";
value: string;
};

See the examples below for explaining these parameters.

List of available query fields
NameDescription
data_process_permanent_identityData process permanent ID
data_process_nameData process name
data_process_revisionData process revision number
logic_nameLogic name
logic_permanent_identityLogic permanent ID
logic_revisionLogic revision number
execution_idExecution ID
task_idTask ID
sequenceEvent sequence number
label_idlabel ID
label_nameLabel name (labelName in emit)
source_digital_identitySource DID (sourceDID in emit)
target_digital_identityTarget DID (targetDID in emit)
typeType (type in emit)
timestampEvent emitted time (unix timestamp)

All fields are optional. aggregation is the syntax for getting metrics, statistics, or other analytics from Elasticsearch, which is an advanved feature that we will not demostrate here.

Search Result

Type: SearchResult

MemberTypeDescription
eventsEvent[]Queried events
countnumberNumber of events to be queried (size parameter from Search)
totalnumberActual queried number of events
tooknumberQuery time (milllisecond seconds)
aggregation?AggregationResult | nullAggregation results

count and total are similar metrics from Elasticsearch using different APIs; you can ignore them and simply use events.length instead.

Queried Events

Type: Event

An event in events is type of Event (different from the one used in emit):

MemberTypeDescription
dataProcessIdentityContextVersionedIdentityContextData process ID and name
logicIdentityContextVersionedIdentityContextLogic identity ID and name
executionIdstringExecution ID
taskIdstringTask ID
sequencenumberEvent sequence (the emit order in an array, started from 0)
labelLabel, which is { id: string; name: string; }Event label ID and name
sourceDigitalIdentitystringSource DID
targetDigitalIdentitystringTarget DID
metastringMeta payload
typestringEvent group
timestampstringEvent emitted datetime (ISO 8601 string)

Each queried event, other than the basic fields, also contains info about the execution, task, logic and data process where it was emitted.

Example: query events

const requests = {
queries: [
{
field: "label_name", // field name
type: "match", // matching operater
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};

const query = await EventAgent.search(requests);
const events = query?.events;

// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});

type defines how should the value be used to query the specified field:

Query typeDescription
"match"Any word in the field matches words in your value. Standard full-text search. Suitable for most use cases.
"term"Field matches exactly your value.
"match_phrase"Words and their order in the field matches words in your value. (For example, value "has been" matches field it has been raining.)
info

These querying methods are directly from Elasticsearch: match, term and match_phrase.

tip

Events require a little bit of time to be stored into Elasticsearch. If you query events almost immediately after they are emitted, EventAgent.search may return an empty result.

One of the workaround is to use a timed loop:

let events = [];
const start = Date.now();
let now = Date.now();

// wait as long as 30 seconds to query events
do {
const query = await EventAgent.search({
queries: [
{
field: "label_name",
type: "match",
value: "label name",
},
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
events = query?.events;
now = Date.now();
} while (events.length == 0 || now - start < 30000);

The example above will keep query events until something is returned or the time exceeds 30 seconds.

Example: exclude events

const query = await EventAgent.search({
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// match condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;

Example: filter events (range)

If a field of certain events is numeric data, you can apply a filter range:

const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to
lte: null, // value smaller than or equal to
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;

Both gte and lte fields are optional and can be set to either a number or null. The example above will query events that target_digital_identity >= 9000.

tip

When filtering events within a time range with timestamp field, convert the time to unix timestamp. For example:

filters: [  // filter events for the past hour
{
field: "timestamp",
gte: Date.now() - 60 * 60 * 1000, // starts from 1 hour ago (= 60 * 60 * 1000 ms)
lte: Date.now(),
type: "range",
}
],

Example: filter events (wildcard)

filters can apply a wildcard search on string names as well, using the following wildcard operators:

const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;
Wilcard operatorDescription
?Representing any single character
*Representing zero or more characters, including an empty one

For example, event-?-* matches event-A-1 and event-B-123, and so on.

Example: sort events

const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;

order can be

  • "asc" (ascending order)
  • "desc" (descending order)

This also works for both numeric data or non-numeric strings (sorting alphabetically).

Query Event Sequences

Search sequence of events. The first event has to satisfy first search condition, and so on...

async EventAgent.searchWithPattern(request: Pattern): Promise<PatternResult>

Sequence Search Parameter

Type: Pattern

MemberTypeDescription
sequencesSequence[]Sequence of conditions
filter?Filter[]Filter conditions (see here)
maxSpan?stringSearch time span (for example, 30s = 30 secs and 15m = 15 mins)

Sequence Parameter

Type: Sequence

MemberTypeDescription
conditions?Condition[] | nullSequence query conditions
sharedFields?string[] | null
type?string | null

The available field names in conditions? are the same as search(). See the example for details.

Sequence Search Result

Type: PatternResult

MemberTypeDescription
sequencesSequencesResult[]Sequence of queried events
countnumberNumber of events to be queried
totalnumberActual queried number of events
tooknumberQuery time (milllisecond seconds)

Returned Sequence

Type: SequencesResult

PatternResult contains an array of such sequences, each sequence would contain one or more events:

MemberTypeDescription
eventsEvent[]Queried events
joinKeysstring[]

Example

// create sequence search pattern
const query = await EventAgent.searchWithPattern({
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
});

const sequences = query?.sequences;

// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});

op operator in conditions includes the following options:

OperatorDescriptionn
"eq"equal to
"ne"not equal to
"gt"greater than
"lt"less than
"gte"greater than or equal to
"lte"less than or equal to