pull-window

Aggregate a pull-stream into windows.

pull-window

Aggregate a pull-stream into windows.

Several helpers are provided for particular types of windows, sliding, tumbling, etc.

And also, a low level

sum every 10 items.

var pull   = require('pull-stream')
var window = require('pull-window')
 
function everyTen () {
  var i = 0
  //window calls init with each data item, 
  //and a callback to close that window. 
  return window(function (datacb) {
    //if you don't want to start a window here, 
    //return  
    if(!= 0) return
    var sum = 0
 
    //else return a function. 
    //this will be called all data 
    //until you callback. 
    return function (enddata) {
      if(end) return cb(null, sum)
      sum += data
      if(++>= 10) {
        i = 0
        cb(null, sum)
      }
    }
  }
}
 
pull(
  pull.count(1000),
  everyTen(),
  pull.log()
)

Each window doesn't have to be the same size...

var pull   = require('pull-stream')
var window = require('pull-window')
 
function groupTo100 () {
  var sum = null
  return window(function (_cb) {
    if(sum != null) return
 
    //sum stuff together until you have 100 or more 
    return function (enddata) {
      if(end) return cb(null, sum)
      sum += data
      if(sum >= 100) {
        //copy sum like this, incase the next item 
        //comes through sync 
        var _sum = sum; sum = null
        cb(null, _sum)
      }
    }
  })
}
 
pull(
  pull.count(1000)
  groupTo100(),
  pull.log()
)

to make more over lapping windows just return the window function more often.

var pull   = require('pull-stream')
var window = require('pull-window')
 
function sliding () {
  return window(function (_cb) {
    var sum = 0, i = 0
 
    //sum stuff together until you have 100 or more 
    return function (enddata) {
      if(end) return cb(null, sum)
      sum += data
      if(++>= 10) {
        //in this example, each window gets it's own sum, 
        //so we don't need to copy it. 
        cb(null, sum)
      }
    }
  })
}
 
pull(
  pull.count(100)
  sliding(),
  pull.log()
)
 
window(function startWindow (datacb) {
 
  //called on each chunk 
  //including the first one 
  return function addToWindow (enddata) {
    //cb(null, aggregate) when done. 
  }
}, function mapWindow (startdata) {
  //(optional) 
  //map the window to something that tracks start, also 
})

By default, windows are mapped to {start: firstData, data: aggregate}. unless you pass in an different mapWindow function.

reduce every size items into a single value, in a sliding window

tumbling window that groups items onto an array, either every size items, or within time ms, which ever occurs earliest.

MIT