streamlined

Collection of helper streams to deal with mapping/transforming object streams

streamlined

Collection of helper streams to deal with mapping/transforming object streams

This module is installed via npm:

$ npm install streamlined

Use #limit() or #head() to limit the number of data events emitted to the first 5:

var sl = require('streamlined');
var count = 0;
myObjectModeStream()
  .pipe(sl.limit(5))
  .on('data', function (data) {
    // should only print the first 5 data events 
    console.log(data);
  });

Useful to writing to leveldb.

Provide the path of the key as the first arg of the #key function, and it will be made the key field, and the body of the data event will be made the value field:

var sl = require('streamlined');
myObjectModeStream()
  .pipe(sl.key('properties.time')
  .on('data', function (data) {
    // `data` will look like { key: properties.time, value: originalData } 
  });

Counts the number of objects in a stream with #count:

var sl = require('streamlined');
myObjectModeStream()
  .pipe(sl.count())
  .on('data', function (data) {
    console.log(data);
    // prints: { count: 1838 } 
  })

Keeps track of the total byte size of the objects flowing through based on the JSON stringified size. Doesn't include crlf or surrounding array characters, however:

var sl = require('streamlined'),
    JSONStream = require('JSONStream'),
    fs = require('fs');
myObjectModeStream()
  .pipe(sl.limitBytes(1024))
  .pipe(JSONStream.stringify('', '', ''))
  // output.json won't be bigger than 1024 bytes 
  .pipe(fs.createWriteStream('output.json'));

Uses the #tail function to only return the last n items in an objects stream:

var sl = require('streamlined'),
myObjectModeStream()
  .pipe(sl.tail(5))
  .on('data', function (data) {
    // should only print the last 5 data events 
    console.log(data);
  });

Say you have a stream of web log events, and want to count how many users use different web browsers. You can use the #distinct function and pass it the path of the browser property, and get the answer!:

var sl = require('streamlined'),
myObjectModeStream()
  .pipe(sl.distinct('properties.$browser'))
  .on('data', function (data) {
    // data should be: 
    /***
     * data should be:
        {
          Safari: 156,
          Firefox: 349,
          Chrome: 822,
          'Mobile Safari': 229,
          'Internet Explorer': 160,
          'Android Mobile': 76,
          'Opera Mini': 6,
          'Chrome iOS': 26,
          undefined: 2,
          'Facebook Mobile': 2,
          Opera: 8,
          BlackBerry: 2
        };
    */
  })

Use the #missing stream to only show objects that don't contain a field, given a path to the field:

var sl = require('streamlined'),
myObjectModeStream()
  .pipe(sl.missing('properties.$browser'))
  .on('data', function (data) {
    // only events that are missing the `properties.$browser` field will be 
    // printed 
    console.log('data');
  })

Given an array of object paths, retrieve those fields and return them as an array of values as a 'row':

var sl = require('streamlined'),
myObjectModeStream()
var results = [];
events
  .pipe(sl.tail(5))
  .pipe(sl.select(['properties.time', 'properties.distinct_id']))
  .on('data', function (data) {
    console.log(data);
  });
  /***
   * will print out the 5 rows, the first row is a timestamp,
   * the second row is the user Id:
      [ 1395896604, '9f4533ce876717bc49b11fcb02015483' ]
      [ 1395896604, '9f4533ce876717bc49b11fcb02015483' ]
      [ 1395896610, '99766c220b35770d8fbe03f79cf326a7' ]
      [ 1395896610, 'b53293da8304e965c3d63a7b5bc7f0d4' ]
      [ 1395896611, '99766c220b35770d8fbe03f79cf326a7' ]
   */

You can also choose to return those items as an object, where the paths used to locate the fields will be used to contruct a minimal object with those selected values, just pass through true as the last field in #select:

var sl = require('streamlined'),
myObjectModeStream()
  .pipe(sl.tail(5))
  .pipe(sl.select(['properties.time', 'properties.distinct_id'], true))
  .on('data', function (data) {
    console.log(data);
  });
  /***
   * will print out the 5 rows:
      { properties:
         { time: 1395896604,
           distinct_id: '9f4533ce876717bc49b11fcb02015483' } }
      { properties:
         { time: 1395896604,
           distinct_id: '9f4533ce876717bc49b11fcb02015483' } }
      { properties:
         { time: 1395896610,
           distinct_id: '99766c220b35770d8fbe03f79cf326a7' } }
      { properties:
         { time: 1395896610,
           distinct_id: 'b53293da8304e965c3d63a7b5bc7f0d4' } }
      { properties:
         { time: 1395896611,
           distinct_id: '99766c220b35770d8fbe03f79cf326a7' } }
   */

Often you just want to select a single field from an object, or get a sub-object by supplying a path to the field. Use #pluck with a path locator:

var sl = require('streamlined'),
myObjectModeStream()
  .pipe(sl.pluck('properties'))
  .on('data', function (data) {
    console.log(data);
  })
  /***
   *  Will print out the child objects under the `properties` field:
      { time: 1395773040,
        distinct_id: '7eab3f90d1d3164608293f3d38759e4e',
        campaign_id: 146099,
        delivery_id: 90516639,
        message_id: 51683,
        message_type: 'email' }
      { time: 1395773053,
        distinct_id: '7eab3f90d1d3164608293f3d38759e4e',
        campaign_id: 146099,
        type: 'email' }
  */

Often you would like to filter a stream of objects based on the value of one of the fields. In this case, just supply a field locator, and the value that you are trying to find to retrict your results using the #where stream:

var sl = require('streamlined'),
myObjectModeStream()
  .pipe(sl.where('properties.$browser', 'Chrome'))
  .on('data', function (data) {
    console.log(data.properties.$browser);
    // Should print out: 'Chrome' 
  })

For more powerful queries you can use the mongodb query syntax (as implemented by jsonquery. Just pass a query object to #where:

var sl = require('streamlined'),
myObjectModeStream()
  .pipe(sl.where({ 'properties.$browser': 'Chrome' }))
  .on('data', function (data) {
    console.log(data.properties.$browser);
    // Should print out: 'Chrome' 
  })

Say you want to transform every object in a stream. Simply provide a transform function to the #map stream:

var sl = require('streamlined'),
    crytpo = require('crypto');
 
// return an md5 of data 
function md5(data) {
  var md5sum = crypto.createHash('md5');
  md5sum.update(data);
  return md5sum.digest('hex');
}
 
// return the md5 of the properties.$browser field 
function md5browser(data) {
  data.properties.$browser = md5(data.properties.$browser);
  return data;
}
 
myObjectModeStream()
  .pipe(sl.map(md5browser))
  .on('data', function (data) {
    console.log(data.properties.$browser);
  })
  // will print out the md5 of the browser values 
  /**
    986c37480b1f1c2e443504b38b6361b4
    986c37480b1f1c2e443504b38b6361b4
    986c37480b1f1c2e443504b38b6361b4
    b4540da93e13d1326d68d2258e45446e
    986c37480b1f1c2e443504b38b6361b4
   */

Say you want to transform every object in a stream. Simply provide a transform function to the #map stream:

Often all you want to do is just iterate and call a function on every item in a stream:

var sl = require('streamlined');
 
myObjectModeStream()
  .pipe(sl.map(md5browser))
  .pipe(sl.data(console.log));
 
  // will print out the md5 of the browser values 
  /**
    986c37480b1f1c2e443504b38b6361b4
    986c37480b1f1c2e443504b38b6361b4
    986c37480b1f1c2e443504b38b6361b4
    b4540da93e13d1326d68d2258e45446e
    986c37480b1f1c2e443504b38b6361b4
   */

You can pass a second parameter to #data which gets called on the end event.

Often you want to simply collect all the items in a stream into a single array for batch processing. You can use #collect for this:

var sl = require('streamlined');
 
myObjectModeStream()
  .pipe(sl.map(md5browser))
  .pipe(sl.collect(console.log));
 
  // will print out an array of the md5 of the browser values 
  /**
    [ '986c37480b1f1c2e443504b38b6361b4',
      '986c37480b1f1c2e443504b38b6361b4',
      '986c37480b1f1c2e443504b38b6361b4',
      'b4540da93e13d1326d68d2258e45446e',
      '986c37480b1f1c2e443504b38b6361b4' ]
   */

You can pass a second parameter to #data which gets called on the end event.

Used in conjunction with some simple aggregating functions (such as those provided by defunct-aggregates, but you can easily write your own), you can do the object-stream equivalent of SQL SUMs and COUNTs, and MAX/MINs, here's a SUM:

var dagg = require('defunct-aggregates'),
    sl = require('streamlined');
myObjectStream()
  .pipe(sl.aggregate('properties.$initial_referring_domain', dagg.sum('properties.time')))
  .on('data', function (data) {
    console.log(data);
    /*
       {
        '$direct': 2465011296034,
        'www.something.com': 8374730552,
        'mail.qq.com': 18145722876,
        'cwebmail.mail.163.com': 2791450358,
        'm.email.seznam.cz': 4187199003,
        undefined: 2791546093,
        'm.facebook.com': 2791563670,
        'www.google.co.uk': 2791596886,
        'nm20.abv.bg': 2791603002,
        'www.google.com': 18146059483,
        'webmailb.netzero.net': 13958396912,
        'webmail.kitchenrefacers.ca': 4187590206,
        'www.ekit.com': 2791762707,
        'webmail.myway.com': 13958909358,
        'poczta.wp.pl': 2791781736 }
     */
  });

The #aggregate function takes two arguments. The first is a locator expression that will locate the field to GROUP BY. The second is an aggregating function.

The signature of the aggregating function is:

function (accumulatordata, [columnn name]) {
  // modify the accumulator 
  return accumulator;
}

To see more about how these are implemented, check out defunct-aggregates.

If the second argument is an map of property to functions, then you can do multiple aggregations:

var sl = require('streamlined');
myObjectStream()
  .pipe(sl.aggregate('properties.$initial_referring_domain', {
    sum: sum('properties.time'),
    max: max('properties.time')
  }))
  .on('data', function (data) {
    console.log(data);
    /*
     { '$direct': { sum: 2465011296034, max: 1395896611 },
       'www.something.com': { sum: 8374730552, max: 1395871232 },
       'mail.qq.com': { sum: 18145722876, max: 1395893241 },
       'cwebmail.mail.163.com': { sum: 2791450358, max: 1395725179 },
       'm.email.seznam.cz': { sum: 4187199003, max: 1395733001 },
       undefined: { sum: 2791546093, max: 1395773053 },
       'm.facebook.com': { sum: 2791563670, max: 1395781835 },
       'www.google.co.uk': { sum: 2791596886, max: 1395798443 },
       'nm20.abv.bg': { sum: 2791603002, max: 1395801501 },
       'www.google.com': { sum: 18146059483, max: 1395895873 },
       'webmailb.netzero.net': { sum: 13958396912, max: 1395839884 },
       'webmail.kitchenrefacers.ca': { sum: 4187590206, max: 1395863906 },
       'www.ekit.com': { sum: 2791762707, max: 1395881354 },
       'webmail.myway.com': { sum: 13958909358, max: 1395891210 },
       'poczta.wp.pl': { sum: 2791781736, max: 1395890868 } };
    */

The last argument to the #aggregate function is a boolean which you can use to ignore undefined results (ie. key values of undefined), or whether you want them there. The default is false (show the undefineds).

Who needs SQL, right?

Given the path of a user ID, and the path of the 'Event' that you wish to report on, and an array of the 'Events' that you want to report on for your funnel, using the #funnel stream will generate a report of how many user went through every stage of the funnel:

var sl = require('streamlined');
myObjectModeStream()
  .pipe(sl.funnel(
    'properties.distinct_id', // path to the user id 
    'event',                  // path to the event name 
    // list of events to report on for the funnel 
    ['Viewed Sales Page', 'Clicked Add To Cart', 'Viewed Beta Invite', 'Submitted Beta Survey']))
  .on('data', function (data) {
    console.log(data);
    /** Prints this report:
      { 'Viewed Sales Page': 399,
        'Clicked Add To Cart': 36,
        'Viewed Beta Invite': 36,
        'Submitted Beta Survey': 28 };
     */
  });
});

Makes it easy to output an object streams as CRLF JSON:

var sl = require('streamlined');
myObjectModeStream()
  .pipe(sl.stringify())
  .pipe(process.stdout)
 
// prints each object to stdout JSON.stringified with a new line character