This package has been deprecated

Author message:

This package has been replaced with @corva/node-sdk

@corva/node-stream-lambda
TypeScript icon, indicating that this package has built-in type declarations

5.0.0 • Public • Published

node-stream-lambda

Stream lambda class for constructing data apps that handle stream.

ToC

Requirements

  • node.js - >=12.0.0

Getting started

Installation

npm i @corva/node-stream-lambda

Features

  • Defines stream workflow

Workflow

Scheduled lambda has 3 phases:

  • pre-process (groups events by asset id and filters already processed records)
  • process
  • post-process (saves last processed records info to redis)
@startuml
(*) -->[{event}] "preProcess"

if "Exception?" then
  -->[true, {error}] "postProcess"
else
  -->[false, {event}] "process"
  -->[{data, error}] "postProcess"
endif
-->[End] (*)
@enduml

workflow diagram

Usually you need to define process function to handle the data. Pre-process is used to modify incoming data in some way. Post-process is used to handle processed data or/and handle and log errors that were thrown during pre-process or process.

Also it expose .run method that is used to launch processing.

Configuration

Stream config

/**
* App name, in most cases should match application name
*/
name: string;
/**
* Provider name
*/
provider: string;
/**
* Maps keys to snakeCase if true
* @default false
*/
mapKeys?: boolean;
/**
* Filtering settings allow to filter alraeady processed events to avoid duplicates, disabled by default
*/
filter?: {
  /**
    * Filter incoming records by last processed timestamp (will work only for time-based data)
    * @default false
    */
  byLastProcessedTimestamp?: boolean;
  /**
    * Filter incoming records by last processed depth (will work only for depth-based data)
    * @default false
    */
  byLastProcessedDepth?: boolean;
};

Pre-process

v2

Stream lambda pre-process does sevral preparation steps:

  1. Converts "old" and "new" stream event format into common style. You can find new event format in examples;
  2. Detects end of "drilling" and adds flag Symbol.for('completed') to records that passed to process in event parameter.
  3. Groups event records by asset_id and calls process for each asset_id individually, so that could result in multiple process and post-process calls in case when you have multiple asset_ids in events. However it's not usual case.
  4. Filters events, that are "already processed" depending on state from previous run (see post-process for details).
v3+
  1. Optionally maps keys to camelCase and adds assetId, appConnectionId and isCompleted fileds to event.
  2. Filters events, that are "already processed" depending on state from previous run (see post-process for details).
Event transformation

Events that come in new format:

[
  {
    "metadata": {
      "apps": {
        "corva.wits-depth-summary": {
          "app_connection_id": 123
        }
      },
      "app_stream_id": 456
    },
    "records": [
      {
        "asset_id": 1,
        "timestamp": 1546300800,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.4,
          "weight_on_bit": 1,
          "state": "Some unnecessary drilling that's excluded"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546300800,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.4,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      }
    ]
  }
]

will be tranformed into:

[
  {
    "assetId": 1,
    "appConnectionId": 123,
    "isCompleted": false,
    "metadata": {
      "apps": {
        "corva.wits-depth-summary": {
          "app_connection_id": 123
        }
      },
      "app_stream_id": 456
    },
    "records": [
      {
        "asset_id": 1,
        "timestamp": 1546300800,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.4,
          "weight_on_bit": 1,
          "state": "Some unnecessary drilling that's excluded"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546300800,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.4,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      }
    ]
  }
]

And that's what you'll receive in process handler in event parameter.

Post-process

Stream lambda in post-process saves last processed record into it's state which is stored in redis. This is done to prevent already processed records being processed sevral times. If process throws an error this will be skipped and state will not be changed.

Cache key for state is like:

/**
 * provider - company's identifier, i.e. my-oil-company
 * assetId - well's identifier, taken from the event
 * appStreamId - taken from the event
 * appKey - app's unique identifier, taken from app config
 * appConnectionId - taken from event
 */
`${provider}/well/${assetId}/stream/${appStreamId}/${appKey}/${appConnectionId}`

Value is hash map, so use hset, hget, hmget and hgetall to reach out values you need. By default it sets state.lastProcessedTimestamp for time-based data and state.lastProcessedDepth for depth-based

Examples

const { StreamLambda } = require('@corva/node-stream-lambda');

// you may need api client and other libs
const { ApiClient } = require('@corva/node-api-client');
const apiClient = new ApiClient();

const lambda = new StreamLambda({
  apiClient,
  streamConfig: {},
  process: async ({ event, context }) => {
    const result = event.a + event.b;
    return result;
  },
});

Event

Here's an example of stream event that will be passed to lambda:

[
  {
    "metadata": {
      "apps": {
        "corva.wits-depth-summary": {
          "app_connection_id": 123
        }
      },
      "app_stream_id": 456
    },
    "records": [
      {
        "asset_id": 1,
        "timestamp": 1546300800,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.4,
          "weight_on_bit": 1,
          "state": "Some unnecessary drilling that's excluded"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546300800,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.4,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546300900,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.5,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546301000,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 99.9,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546301100,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 100.3,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546301200,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 100.5,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      },
      {
        "asset_id": 1,
        "timestamp": 1546301300,
        "company_id": 24,
        "version": 1,
        "data": {
          "hole_depth": 100.6,
          "weight_on_bit": 1,
          "state": "Rotary Drilling"
        }
      }
    ]
  }
]

Readme

Keywords

Package Sidebar

Install

npm i @corva/node-stream-lambda

Weekly Downloads

0

Version

5.0.0

License

UNLICENSED

Unpacked Size

28.9 kB

Total Files

11

Last publish

Collaborators

  • aj8k
  • leonid.khomenko
  • suchov
  • vasyl.skalozub
  • andrii.koval
  • yuliiahryhorchuk
  • mgdskr_corva
  • ruslan.shukiurov
  • maksymozymok
  • kostia.khozhay
  • ryandawson
  • bestfit
  • oleksandr_krupko
  • villeti
  • jordanambra
  • dumavit1
  • antonpyrlyk
  • corva-devops-automation