node-stream-lambda
Stream lambda class for constructing data apps that handle stream.
ToC
Requirements
- node.js - >=12.0.0
Getting started
Installation
npm i @corva/node-stream-lambda
Features
- Defines stream workflow
Workflow
Scheduled lambda has 3 phases:
- pre-process (groups events by asset id and filters already processed records)
- process
- post-process (saves last processed records info to redis)
@startuml
(*) -->[{event}] "preProcess"
if "Exception?" then
-->[true, {error}] "postProcess"
else
-->[false, {event}] "process"
-->[{data, error}] "postProcess"
endif
-->[End] (*)
@enduml
Usually you need to define process
function to handle the data. Pre-process is used to modify incoming data in some way. Post-process is used to handle processed data or/and handle and log errors that were thrown during pre-process or process.
Also it expose .run
method that is used to launch processing.
Configuration
Stream config
/**
* App name, in most cases should match application name
*/
name: string;
/**
* Provider name
*/
provider: string;
/**
* Maps keys to snakeCase if true
* @default false
*/
mapKeys?: boolean;
/**
* Filtering settings allow to filter alraeady processed events to avoid duplicates, disabled by default
*/
filter?: {
/**
* Filter incoming records by last processed timestamp (will work only for time-based data)
* @default false
*/
byLastProcessedTimestamp?: boolean;
/**
* Filter incoming records by last processed depth (will work only for depth-based data)
* @default false
*/
byLastProcessedDepth?: boolean;
};
Pre-process
v2
Stream lambda pre-process
does sevral preparation steps:
- Converts "old" and "new" stream event format into common style. You can find new event format in examples;
- Detects end of "drilling" and adds flag
Symbol.for('completed')
to records that passed toprocess
inevent
parameter. - Groups event records by
asset_id
and callsprocess
for eachasset_id
individually, so that could result in multipleprocess
andpost-process
calls in case when you have multipleasset_id
s in events. However it's not usual case. - Filters events, that are "already processed" depending on state from previous run (see post-process for details).
v3+
- Optionally maps keys to camelCase and adds
assetId
,appConnectionId
andisCompleted
fileds to event. - Filters events, that are "already processed" depending on state from previous run (see post-process for details).
Event transformation
Events that come in new format:
[
{
"metadata": {
"apps": {
"corva.wits-depth-summary": {
"app_connection_id": 123
}
},
"app_stream_id": 456
},
"records": [
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Some unnecessary drilling that's excluded"
}
},
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
}
]
}
]
will be tranformed into:
[
{
"assetId": 1,
"appConnectionId": 123,
"isCompleted": false,
"metadata": {
"apps": {
"corva.wits-depth-summary": {
"app_connection_id": 123
}
},
"app_stream_id": 456
},
"records": [
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Some unnecessary drilling that's excluded"
}
},
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
}
]
}
]
And that's what you'll receive in process
handler in event
parameter.
Post-process
Stream lambda in post-process
saves last processed record into it's state which is stored in redis.
This is done to prevent already processed records being processed sevral times.
If process
throws an error this will be skipped and state will not be changed.
Cache key for state is like:
/**
* provider - company's identifier, i.e. my-oil-company
* assetId - well's identifier, taken from the event
* appStreamId - taken from the event
* appKey - app's unique identifier, taken from app config
* appConnectionId - taken from event
*/
`${provider}/well/${assetId}/stream/${appStreamId}/${appKey}/${appConnectionId}`
Value is hash map, so use hset
, hget
, hmget
and hgetall
to reach out values you need. By default it sets state.lastProcessedTimestamp
for time-based data and state.lastProcessedDepth
for depth-based
Examples
const { StreamLambda } = require('@corva/node-stream-lambda');
// you may need api client and other libs
const { ApiClient } = require('@corva/node-api-client');
const apiClient = new ApiClient();
const lambda = new StreamLambda({
apiClient,
streamConfig: {},
process: async ({ event, context }) => {
const result = event.a + event.b;
return result;
},
});
Event
Here's an example of stream event that will be passed to lambda:
[
{
"metadata": {
"apps": {
"corva.wits-depth-summary": {
"app_connection_id": 123
}
},
"app_stream_id": 456
},
"records": [
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Some unnecessary drilling that's excluded"
}
},
{
"asset_id": 1,
"timestamp": 1546300800,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.4,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546300900,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.5,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301000,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 99.9,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301100,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 100.3,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301200,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 100.5,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
},
{
"asset_id": 1,
"timestamp": 1546301300,
"company_id": 24,
"version": 1,
"data": {
"hole_depth": 100.6,
"weight_on_bit": 1,
"state": "Rotary Drilling"
}
}
]
}
]