Skip to main content
Version: LOC v0.10 (legacy)

Event Store Agent

import { EventAgent, Event, Search, Pattern } from "@fstnetwork/loc-logic-sdk";

Emit and query LOC data events.

Once emitted, events are stored and can be searched in Elasticsearch in a very short time. They will be generated to data lineage graphs in Studio.

The data lineage or data trail is represented by the relationship of the graph below:

Availability

  • ✓ Generic logic
  • ✗ Aggregator logic

Emit Events

async EventAgent.emit(events: Event.Event[]): Promise<void>

Emit event(s). The parameter events is an array of events.

Event Schema

Type Event.Event has the following fields:

FieldTypeDescription
labelNamestringLabel name (event name)
sourceDigitalIdentity or sourceDIDstringSource digital identity (DID)
targetDigitalIdentity or targetDIDstringTarget DID
metastringMeta payload (additional data); max length 215 (32768) bytes.
typestringEvent type (group)

The input parameter/value of a would-be event is also referred as event schema.

info

For now type only supports "default".

Elements of events does not have to be Event.Event type, but an error would be thrown if label, sourceDID or targetDID field is not present.

Source and target nodes will be created in Elasticsearch if not exist. Any nodes can be both source and target of other events.

Example

await EventAgent.emit([
{
// event 1
labelName: "Event name 1",
sourceDID: "Event source 1",
targetDID: "Event target 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Event source 2",
targetDID: "Event target 2",
meta: "",
type: "default",
},
// ...
]);
warning

The events may not be properly emitted without using await.

tip

You can also use JSON.stringify() to include a JSON object in the meta payload, and later decode it with JSON.parse().

Query Events

Query event(s) in Elasticsearch.

async EventAgent.search(request: Search): Promise<SearchResult>

Parameter request is of type Search and the function returns type SearchResult.

Search Parameter

Type: Search

MemberTypeDescription
queries?Query[] | nullEvent query conditions
excludes?Query[] | nullEvent exclude conditions
filters?Filter[] | nullEvent filter conditions
sort?Sort[] | nullEvent sort operator
aggregation?Aggregation | nullAggregation syntax
from?number | nullEvent query starts from
size?number | nullEvent query size

All fields are optional. aggregation is the syntax for getting metrics, statistics, or other analytics from Elasticsearch, which is an advanved feature that we will not demostrate here.

note
type Query =
| {
field: string; // event field - see "available fields names" below
type: "match"; // match operator
value: string; // field value to be matched
}
| {
field: string;
type: "match_phrase";
value: string;
}
| {
field: string;
type: "term";
value: string;
};

These querying methods are directly from Elasticsearch: match, term and match_phrase:

type (match operator)Description
"match"Any word in the field matches your value (fuzzy search). Standard full-text search. Suitable for most use cases.
"term"The field matchees exactly to your value (precise search).
"match_phrase"Words and their order matches words given in your value. (For example, value "has been" matches field it has been raining.)

See query events example.

List of available fields names for query, filter or sort
NameDescription
label_idlabel ID
label_nameLabel name (labelName in emit())
source_digital_identitySource DID (sourceDID in emit())
target_digital_identityTarget DID (targetDID in emit())
typeType (type in emit())
sequenceEvent sequence number (the emit order in an array, starting from 0)
timestampEvent emitted time (unix timestamp)
execution_idExecution ID
task_idTask ID
data_process_permanent_identityData process permanent ID
data_process_nameData process name
data_process_revisionData process revision number
logic_nameLogic name
logic_permanent_identityLogic permanent ID
logic_revisionLogic revision number

Search Result

Type: SearchResult

MemberTypeDescription
eventsEvent[]Queried events
countnumberNumber of events to be queried (size parameter from Search)
totalnumberActual queried number of events
tooknumberQuery time (milllisecond seconds)
aggregation?AggregationResult | nullAggregation results

count and total are similar metrics from Elasticsearch using different APIs; you can ignore them and simply use events.length instead.

List of examples:

Example: query events

const requests = {
queries: [
{
field: "label_name", // field name
type: "match", // matching operater
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};

const query = await EventAgent.search(requests);
const events = query?.events;

// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
tip

Events require a little bit of time to be stored into Elasticsearch. If you query events almost immediately after they are emitted, EventAgent.search may return an empty result.

One of the workaround is to use a timed loop:

let events = [];
const start = Date.now();
let now = Date.now();

// wait as long as 30 seconds to query events
do {
const query = await EventAgent.search({
queries: [
{
field: "label_name",
type: "match",
value: "label name",
},
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
events = query?.events;
now = Date.now();
} while (events.length == 0 || now - start < 30000);

The example above will keep query events until something is returned or the time exceeds 30 seconds.

Example: exclude events

const query = await EventAgent.search({
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// match condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;

Example: filter events (range)

If a field of certain events is numeric data, you can apply a filter range:

const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to
lte: null, // value smaller than or equal to
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;
tip

When filtering events within a time range with timestamp field, convert the time to unix timestamp. For example:

filters: [  // filter events for the past hour
{
field: "timestamp",
gte: Date.now() - 60 * 60 * 1000, // starts from 1 hour ago (= 60 * 60 * 1000 ms)
lte: Date.now(),
type: "range",
}
],

Example: filter events (wildcard)

filters can apply a wildcard search on string names as well, using the following wildcard operators:

const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;

Example: sort events

const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
});

const events = query?.events;

Queried Events

Type: Event

An event in events is type of Event (different from the one used in emit):

MemberTypeDescription
labelLabel, which is { id: string; name: string; }Event label ID and name
sourceDigitalIdentitystringSource DID
targetDigitalIdentitystringTarget DID
metastringMeta payload
typestringEvent group
sequencenumberEvent sequence (the emit order in an array, starting from 0)
timestampstringEvent emitted datetime (ISO 8601 string)
executionIdstringExecution ID
taskIdstringTask ID
dataProcessIdentityContextVersionedIdentityContextData process ID and name
logicIdentityContextVersionedIdentityContextLogic identity ID and name

Each queried event, other than the basic fields, also contains info about the execution, task, logic and data process where it was emitted.

Query Event Sequences

Search sequence of events. The first event has to satisfy first search condition, and so on...

async EventAgent.searchWithPattern(request: Pattern): Promise<PatternResult>

Sequence Search Parameter

Type: Pattern

MemberTypeDescription
sequencesSequence[]Sequence of conditions
filter?Filter[]Filter conditions (see here)
maxSpan?stringSearch time span (for example, 30s = 30 secs and 15m = 15 mins)

Sequence Parameter

Type: Sequence

MemberTypeDescription
conditions?Condition[] | nullSequence query conditions
sharedFields?string[] | null
type?string | null

The available field names in conditions? are the same as search(). See the example for details.

Sequence Search Result

Type: PatternResult

MemberTypeDescription
sequencesSequencesResult[]Sequence of queried events
countnumberNumber of events to be queried
totalnumberActual queried number of events
tooknumberQuery time (milllisecond seconds)

Returned Sequence

Type: SequencesResult

PatternResult contains an array of such sequences, each sequence would contain one or more events:

MemberTypeDescription
eventsEvent[]Queried events
joinKeysstring[]

Example

// create sequence search pattern
const query = await EventAgent.searchWithPattern({
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
});

const sequences = query?.sequences;

// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});

op operator in conditions includes the following options:

OperatorDescriptionn
"eq"equal to
"ne"not equal to
"gt"greater than
"lt"less than
"gte"greater than or equal to
"lte"less than or equal to