node package manager

aggregate-stream

fluent aggregate query builder with promise and streaming support

MongoDB Aggregate Stream

NPM version build status Test coverage Gittip

A wrapper around MongoDB's .aggregate() function:

  • Returns a proper stream
  • Easier pipeline building
  • Return promises

This is based on maggregate which is not up to date with MongoDB 2.6+ and friends.

API

var aggregate = require('aggregate-stream');
 
// return all the results at once 
aggregate(collection)
.match({
  'identity': 'person'
})
.group({
  _id: '$_id',
  count: {
    $sum: 1
  }
})
.toArray(function (err, docs) {
 
})
 
// as a promise 
aggregate(collection)
// pipeline... 
.then(function (docs) {
 
}, function (err) {
 
})
 
// as a stream 
http.createServer(function (req, res) {
  res.setHeader('content-type', 'application/json')
  aggregate(collection)
  // pipeline... 
  .pipe(JSONStream.stringify())
  .pipe(res)
})

aggregate(collection, [options])

Return a new aggregation instance with options. See options: http://mongodb.github.io/node-mongodb-native/api-generated/collection.html#aggregate

.set(key, value)

Set an option after initialization.

aggregate(collection).set('readPreference', 'secondaryPreferred')

.transform(fn)

Manipulate each document before returning them.

aggregate(collection)
.transform(function (doc) {
  doc.transformed = true;
  return doc
})

.[operators](obj)

Each operator is its own method. See: http://docs.mongodb.org/manual/meta/aggregation-quick-reference/#aggregation-operator-quick-reference

aggregate(collection)
.match({
  type: 'human'
})

.toArray([cb])

Return all the results as a single array. If no callback is supplied, a promise is returned.

aggregate(collection)
// pipeline... 
.toArray(function (err, docs) {
 
})

.explain([cb])

Return the "explain" on the query. If no callback is supplied, a promise is returned.

aggregate(collection)
// pipeline... 
.then(console.log)

.destroy()

Destroy the stream. Use this to prevent any leaks.

http.createServer(function (req, res) {
  var stream = aggregate(collection)
 
  req.socket.once('close', function () {
    // always make sure this cursor is closed when the request is finished 
    stream.destroy()
  })
})

.then(resolve, reject)

A wrapper around .toArray() that can be implicitly called by a control flow engine.

co(function* () {
  var docs = yield aggregate(collection).match().group() // ... 
})

This makes each instance a "promise", but you shouldn't be using .then() directly - use .toArray() instead.