@nypl/nypl-streams-client

1.0.0 • Public • Published

NYPL Streams Client

This is a helper module for reading and writing to NYPL streams with/without Avro encoding.

Installation

Install it via npm for use inside your project:

npm i @nypl/nypl-streams-client --save

Usage

const NyplStreamsClient = require('@nypl/nypl-Streams-client')
var streamsClient = new NyplStreamsClient({ nyplDataApiClientBase: 'http://example.com/api/v0.1/' })

See docs/usage.md for complete documentation of Client methods and use.

Example 1: Writing data to a stream

To write a single record to a stream (encoded to "MyStream" schema):

streamsClient.write('MyStream', { id: 'id1', field1: 1, field2: 2 }).then((resp) => {
  console.log('Finished writing to stream ' + resp.Records.length)
}).catch((e) => console.error('Error writing to stream: ', e))

To write multiple records to a stream, batched and rate-limited to avoid write errors:

var records = [ { id: 'id1', field1: 1, field2: 2 }, { id: 'id2', field1: 1 }, ... ] // Array of any length
var options = {
  recordsPerSecond: 500 // This is the default and well below the 1000/s AWS constraint
}
streamsClient.write('MyStream', records, options).then((resp) => {
  console.log('Finished writing to stream ' + resp.Records.length)
  console.log(`Failed to write: ${resp.FailedRecordCount} record(s)`)
}).catch((e) => console.error('Error writing to stream: ', e))

Above will resolve after records.length / 500 seconds. The resolved value is a hash merged from the hashes returned from each putRecords call.

Example 2: Decoding data obtained from a stream

The streams client can be used for decoding data obtained directly from a stream (i.e. via a Lambda Kinesis source).

Example lambda handler with a kinesis trigger:

exports.handler = function (event, context, callback) {
  // Initialize streams client:
  const streamsClient = new NyplStreamsClient({ nyplDataApiClientBase: 'http://example.com/api/v0.1/' })
  const record = event.Records[0]

  if (record.kinesis) {
    const decodedKinesisData = streamsClient.decodeData('SchemaName', event.Records.map(record => record.kinesis.data));

    // Resolve the Promise and do something with the decoded data
    return decodedKinesisData
      .then((result) => console.log('result:', result))
      .catch((err) => console.log('rejected:', err));
  }
}

CLI

The library includes a CLI for writing arbitary events to streams. Care should be taken to construct events that confirm to the relevant schema.

For example, to write a SierraBibRetrievalRequest encoded event to the SierraBibRetriever-qa stream:

cli/nypl-streams.js --envfile config/qa.env --profile nypl-digital-dev write SierraBibRetriever-qa --schemaName SierraBibRetrievalRequest '{ "id": "21747246" }'

Git workflow

  • Cut feature branch from master.
  • Create PR to merge feature branch into master
  • After PR approved by multiple co-workers, the author merges the PR.

Publishing to NPMJS

Once the PR has been approved and merged, check out the target branch locally and:

  1. Bump the version:
  • Bump the version number in package.json
  • Run nvm use; npm i to update package-lock.json
  • Commit changes
  • Git tag it (e.g. git tag -a v2.1.1)
  • Push changes to origin (including tags via git push --tags)
  1. Publish changes to NPMJS:
  • Run npm publish --dry-run to verify nothing is being packaged that should not be!
  • npm publish

Testing

npm test

Readme

Keywords

none

Package Sidebar

Install

npm i @nypl/nypl-streams-client

Weekly Downloads

3

Version

1.0.0

License

ISC

Unpacked Size

79 kB

Total Files

19

Last publish

Collaborators

  • avertrees
  • mansell77
  • bigfishdesign
  • jackiequach
  • toxiapo
  • charmingduchess-nypl
  • samandrews
  • oliviawong
  • thespicemustflow
  • danielappel
  • nonword
  • gkallenberg
  • crystalngai
  • kristo_