@terryweiss/cosmostream
TypeScript icon, indicating that this package has built-in type declarations

1.1.8 • Public • Published

CosmoStream

A library to realize CosmosDB queries as Node streams.

Streams are very handy things. They allows you to deal with large (long) data streams in a way that is memory friendly and, depending on the source, very fast. CosmosDB needs streams desperately because as documented with examples, it expresses fetchAll() /toArray() as the favored, and occassionally the only, solution. Arrays are scary because when they get populated by some other process, you never know how much data you will have to worry about. Streams are more predictable and for the scenarios where they are a best practice a lot easier to reason about (implying this is not suitable for every cosmos interaction, but for the ones it does make sense for, it should work well).

Throughout this document, we will consider the use case of an ETL (Extract Translate Load). In this scenario we will read from one cosmo source (extract), modify each record (translate) and put it in another cosmo source (load). like so:


+----------------+      +---------------------------+   +-----------------------+
| Stage/Products +----> + Add fields to each record +--> + Write to dev/api_test |
+----------------+      +---------------------------+   +-----------------------+

Installation

Until we work out our package repo system (azure artifacts?, npm org?), it is being hosted on my npm org. Cosmostream ships with Typescript typings.

yarn add @terryweiss/cosmostream # using yarn
npm install @terryweiss/cosmosstream --save ## good ol' NPM

Usage

Cosmostream works at the database level. The first thing you will need to do is establish the connection properties for your database. Specifically cosmostream needs key and endpoint definitions. Cosmostream gives you two ways to do this. If you are just connecting to one database the whole time, set the COSMOS_END_POINT and COSMOS_KEY environment variables. If you connect to multiple databases, each call to the connection class can have its own key and endpoint.

const { Collection, Connection, Writer } = require( "@terryweiss/cosmostream" );

const cartConn = new Connection( "goCart", "endpointendpointendpoint", "keykeyey" );

Now with that we can start to do things. Let's get a collection over that connection. You can set up the collection reference manually, or you can use a convenience method provided by the library. We'll do the latter. the fromContainerName method returns a promise that resolves to a collection, in this case products.

const products = await Collection.fromContainerName( cartConn, "products" );

The Collection instance contains all sorts of goodies. In this case we want a read stream, so we will ask the collection to run a query and return a stream for us.

readStream = await products.queryStream( "select * from c where c.location_id = -1", {} );

The queryStream method returns a promise that resolves to a Readable stream. Now this stream can be used to stream one record at a time to you. We have products ready in our example


+----------------+      +---------------------------+   +-----------------------+
| Stage/Products +----> + Add fields to each record +--> + Write to dev/api_test |
+----------------+      +---------------------------+   +-----------------------+
 ^^^^^^^^^^^^^^^^

Let's hook up the other end for writing to the dev instance, api-test collection. We'll create a Writable stream for this.

const devConn = new Connection( "dev", "endpointendpointendpoint", "keykeyey"  );
const apiTest = await Collection.fromContainerName( devConn, "api-test" );
const mungeWriter = new Writer( apiTest.items );

+----------------+      +---------------------------+   +-----------------------+
| Stage/Products +----> + Add fields to each record +--> + Write to dev/api_test |
+----------------+      +---------------------------+   +-----------------------+
 ^^^^^^^^^^^^^^^^                                        ^^^^^^^^^^^^^^^^^^^^^^^^  

The "translate" portion of ETL is all that is left. Since we are getting and writing a stream, we'll create a Transform stream. They are very simple. You implement just one method: _transform. This method is called exactly once per record in the stream. The parameters passed to it from left to write describe the flow. You get data in the chunk, and you pass what you create to the callback which forwards it on to the next stream.

const {Transform} = require("stream");

class MyTransform extends Transform{
    constructor(){
        super({objectMode:true}); // this tells the stream to create an object for each record, otherwise it will be a Buffer.
    }

    _transform(chunk, enc, callback){ // we will ignore the encoding parameter since it has no effect in objectMode
        // add a field
        chunk.lookAtMe = "I am a field";
        // calculate something
        const six = 1+2+3;
        // add it to the record
        chunk.theNumber = six;
        
        // now we can pass the value downstream 
        callback(null, chunk); 

    }   
}

When _transform fires, the stream is logically paused while you do what you gotta do. It only "resumes" when you call callback, so async work can happen so long as you call callback at the right time (when your async work is complete). _transform can be decorated async but you can't return anything from it, but it does allow you to await function calls within the _transform.


+----------------+      +---------------------------+   +-----------------------+
| Stage/Products +----> + Add fields to each record +--> + Write to dev/api_test |
+----------------+      +---------------------------+   +-----------------------+
 ^^^^^^^^^^^^^^^^        ^^^^^^^^^^^^^^^^^^^^^^^^^      ^^^^^^^^^^^^^^^^^^^^^^^^  

Now let's put the pipeline together:

const transform = new MyTransform();

readStream.pipe(transform).pipe(mungeWriter);

The pipe connects a read stream to another stream. It creates a pipeline that manages back-pressure and other details for you. This is truly epic. One line defines the entirety of the work and data flow. And you can build it out and extend it as neccesary. There are a number of excellent stream utility suites. Cosmostream uses mississippi. Have a look-see at this.

Also, open your mind real wide for highland (from the guy that gave us async.js) and consider how a pure data stream might work as a component of a functional work flow.

And then there's gulp where you could automate builds and configure them from data streams. Hmmmm. Makes you think.

And we'll make it run:

readStream.go();

Now data is flowing.


+----------------+      +---------------------------+   +-----------------------+
| Stage/Products +----> + Add fields to each record +--> + Write to dev/api_test |
+----------------+      +---------------------------+   +-----------------------+
 ^^^^^^^^^^^^^^^^        ^^^^^^^^^^^^^^^^^^^^^^^^^      ^^^^^^^^^^^^^^^^^^^^^^^^  
  -------------->-------------->-------------->-------------->-------------->--->

Here's the whole thing in one shot. Notice how little setup there is, particularly how many/few lines of code are needed to read or write to cosmos

const { Transform } = require( "stream" );
const { Collection, Connection, Writer } = require( "@terryweiss/cosmostream" );

async function useCosmoStream() {
	const { Collection, Connection, Writer } = require( "@terryweiss/cosmostream" );

	// products
	const cartConn = new Connection( "goCart", process.env.PRODUCTS_END_POINT, process.env.PRODUCTS_KEY );
	const products = await Collection.fromContainerName( cartConn, "products" );
	const readStream = await products.queryStream( "select * from c where c.location_id = -1", {} );

	// api-test
	const devConn = new Connection( "dev", process.env.API_TEST_ENDPOINT, process.env.API_TEST_KEY );
	const apiTest = await Collection.fromContainerName( devConn, "api-test" );
	const mungeWriter = new Writer( apiTest.items );

	// translate
	const transform = new MyTransform();

	// process data, make profit
	readStream.pipe(transform).pipe(mungeWriter);
	readStream.go();
}

class MyTransform extends Transform{
	constructor(){
		super({objectMode:true}); // this tells the stream to create an object for each record, otherwise it will be a Buffer.
	}

	_transform(chunk, enc, callback){ // we will ignore the encoding parameter since it has no effect in objectMode
		// add a field
		chunk.lookAtMe = "I am a field";
		// calculate something
		const six = 1+2+3;
		// add it to the record
		chunk.theNumber = six;

		// now we can pass the value downstream
		callback(null, chunk);

	}
}

Testing

The tests need to be migrated to mocha, right now they use tape. Also see. :) All you need to do is run node on the test file. After you have created a .env file in /test.

cd test/ && node test.js

.env:

PRODUCTS_KEY=q2MkXXXXXXXXXXXXXXXXXXXXXXXXX==
PRODUCTS_END_POINT=https://XXXXXXXXXXXXXXXXXXXXXXXX:443/
API_TEST_KEY=GEXXXXXXXXXXXXXXXXXXXXXXXXX==
API_TEST_ENDPOINT=https://XXXXXXXXXXXXXXXXXXXXXXXXX:443/

Documentation

Documentation for the API formatted for easy reading on github (till I get around to ghpages) can be had on github. When you install, it will include a docs/html directory with local documentation.

Readme

Keywords

none

Package Sidebar

Install

npm i @terryweiss/cosmostream

Weekly Downloads

16

Version

1.1.8

License

UNLICENSED

Unpacked Size

62.4 kB

Total Files

31

Last publish

Collaborators

  • terryweiss