aggregate-stream

fluent aggregate query builder with promise and streaming support

MongoDB Aggregate Stream

A wrapper around MongoDB's .aggregate() function:

  • Returns a proper stream
  • Easier pipeline building
  • Return promises

This is based on maggregate which is not up to date with MongoDB 2.6+ and friends.

var aggregate = require('aggregate-stream');
 
// return all the results at once 
aggregate(collection)
.match({
  'identity': 'person'
})
.group({
  _id: '$_id',
  count: {
    $sum: 1
  }
})
.toArray(function (errdocs) {
 
})
 
// as a promise 
aggregate(collection)
// pipeline... 
.then(function (docs) {
 
}, function (err) {
 
})
 
// as a stream 
http.createServer(function (reqres) {
  res.setHeader('content-type', 'application/json')
  aggregate(collection)
  // pipeline... 
  .pipe(JSONStream.stringify())
  .pipe(res)
})

Return a new aggregation instance with options. See options: http://mongodb.github.io/node-mongodb-native/api-generated/collection.html#aggregate

Set an option after initialization.

aggregate(collection).set('readPreference', 'secondaryPreferred')

Manipulate each document before returning them.

aggregate(collection)
.transform(function (doc) {
  doc.transformed = true;
  return doc
})

Each operator is its own method. See: http://docs.mongodb.org/manual/meta/aggregation-quick-reference/#aggregation-operator-quick-reference

aggregate(collection)
.match({
  type: 'human'
})

Return all the results as a single array. If no callback is supplied, a promise is returned.

aggregate(collection)
// pipeline... 
.toArray(function (errdocs) {
 
})

Return the "explain" on the query. If no callback is supplied, a promise is returned.

aggregate(collection)
// pipeline... 
.then(console.log)

Destroy the stream. Use this to prevent any leaks.

http.createServer(function (reqres) {
  var stream = aggregate(collection)
 
  req.socket.once('close', function () {
    // always make sure this cursor is closed when the request is finished 
    stream.destroy()
  })
})

A wrapper around .toArray() that can be implicitly called by a control flow engine.

co(function* () {
  var docs = yield aggregate(collection).match().group() // ... 
})

This makes each instance a "promise", but you shouldn't be using .then() directly - use .toArray() instead.