node package manager


Function Pipe

A function pipe enables the management and execution of a series of operations between a source function and a target callback. Each operation in the series (often referred to as middleware) is executed in turn, potentially modifying the source's result, and ultimately, a caller provided target callback is invoked.

If you've used node for a while you are probably familiar with the function pipe pattern; it is used by connect, express, and flatiron to name a few. The goal of the fpipe module is to extract the function pipe behavior common to these great frameworks and encapsulate it for the community's use.

fpipe ensures executions of the pipeline are isolated from overlapping activity, including subsequent modifications to the pipe. It is useful for layering over asynchronous functions when numerous simultaneous executions are anticipated.


npm install fpipe


Tests are written using vows & should.js (you may need to install them). If you've installed in a development environment you can use npm to run the tests.

npm test fpipe

Simple Example

var util = require('util'),
fpipe    = require('fpipe');
// Gets a random integer between 0 (zero) and max 
function randomWait(max) {
    return Math.floor(Math.random() * (max+1));
// Waits a random period of time before returning a dumb message to the 
// callback given. 
function do_something_uninteresting(callback) {
    // simulate latency 
    var wait = randomWait(1000);
    setTimeout(function() {
        callback(null, "It took me ".concat([wait, " milliseconds to notice you."]));
    }, wait);
// Logs it then calls next. 
function logIt(it, next) {
    next(null, it);
// Observes it (by writing it to the console), then calls next. 
function observeIt(it, next) {
    console.log("observed: ".concat(it));
    next(null, it);
// Create an fpipe over our uninteresting function... 
var pipe = fpipe.create(do_something_uninteresting);
// Add some middleware... 
// Execute the pipe... 
pipe.execute(function(err, res) {
    if (err) {
        console.log("Got an error: ".concat(util.inspect(err, false, 99)));
    } else {
        console.log("Got a message: ".concat(res));


An standard import of fpipe var fpipe = require('fpipe') is assumed in all of the code examples. The import results in an object having the following public properties:

  • create - a factory for creating fpipes.
  • log_sink - a shared event emitter where any uncaught exception are exposed.
  • Pipe - the fpipe implementation class.
  • version - exposes the module's version.


Constructs a new function pipe.

// Create a pipe without a source function... 
var my = fpipe.create();


// Create a pipe over a source function... 
var my = fpipe.create(function(callback) { 
    callback(null, { here_is: "a result" });


A function pipe derives from Node.js' EventEmitter and exposes the following interface:


  • _source - the source function over which the pipe will execute.
  • _middleware - an array containing the currently configured middleware on the pipe; this is the series of functions that will execute between the source function and the target callback.


  • use - adds a middleware function to the series of functions that the pipe will execute.
  • execute - invokes the source function, the middleware series, and ultimately the target callback.
  • source - sets the pipe's source function.
  • clone - clones the pipe, optionally changing it's source function.


  • "uncaughtException" - occurs when the pipe encounters an uncaught exception. Generally speaking, this only occurs when your callback throws. Exceptions occurring during the series are given to your callback directly (as per the Node.js callback style).


Adds a middleware function to the series of functions that the pipe will execute.


  • middleware - a middleware function (described below).

To be useful, any middleware function should take the following 2 arguments.

  • res - the result of the predicessor step in the middleware pipeline.
  • next - the next middleware step in the pipeline.

Well behaved middleware will always call next. The next function is a callback in the Node.js style:

Logically, the next function behaves like this:

function myCallback(err, res) {
    if (err) {
        // a prior function in the series resulted in error, 
        // stop the pipe and notify the target callback... 
    } else {
        // no error yet, continue processing... 

Example of well-behaved (albeit trivial), middleware function:

var my = fpipe.create(function(callback) {
    callback(null, { here_is: "a result" });
my.use(function(res, next) {
    next(null, res);

If a middleware function throws an error it will be caught by the Pipe. The pipe will behave as if the middleware returned the error to next.


Executes the function pipe, starting with the source function that you provided, then continuing with each middleware in turn, and finally ending by invoking the callback given.


  • callback - a nodejs style callback taking an error and a result.
  • any..any-n - any value(s) that must be curried to the source function
var util = require('util'),
fpipe    = require('fpipe');
var pipe = fpipe.create();
pipe.use(function(res, next) {
    var modified = (res) 
        ? 'Observed: '.concat(res)
        : 'Observed nothing.';
    next(null, modified);
// Create a pipe over a function that does not take any arguments 
// other than the callback... 
pipe.source(function(callback) {
    callback(null, "Somebody poked me!");
// Execute the pipe, providing a callback. 
pipe.execute(function(err, res) {
    if (err) {
        console.log('Got an error: ' + err);
    } else {
        console.log('Got a result: ' + res);
// Create a clone, replacing the source with 
// a function that expects an argument... 
var pipe2 = pipe.clone(function(you, callback) {
    // simulate some latency, then say hi... 
    setTimeout(function() {
        callback(null, 'Hi ' + you);
    }, 1000);
// Since the source of second pipe takes an argument, 
// we pass it after our callback. Any arguments following 
// the callback are curried to the source in front of 
// the callback so you'll want to ensure that enough 
// arguments are passed (even if some are null) so that 
// the callback is positioned correctly. 
pipe2.execute(function(err, res) {
    if (err) {
        console.log('Got an error: ' + err);
    } else {
        console.log('Got a result: ' + res);
}, 'tester');