asynquence-contrib

contrib plugins for asynquence

asynquence Contrib

Optional asynquence plugin helpers. The full bundle of plugins (contrib.js) is ~3.1k minzipped.

To integrate asynquence into standard callback-oriented code bases, sometimes it's preferable to create wrappers around commonly used callback-oriented functions, to be used in place of the original functions. The wrapper automatically constructs an asynquence instance when called, and wires up the underlying call to the original callback-oriented function so that it maps its output behavior to the asynquence instance.

For example, in node.js, we can call ASQ.wrap(..) to wrap fs.readFile(..), suppressing the callback in its signature and turning it into an asynquence-returning function:

var readfile = ASQ.wrap(fs.readFile);
 
readfile("something.txt",{ encoding: "utf8" })
.val(function(contents){
    // file contents 
})
.or(function(err){
    // oops, `err` in reading file! 
});

Note: ASQ.wrap(..) creates a function which will automatically be async in nature, even if the underlying function would normally have called its callback immediately/synchronously. DO NOT RELY on ordered side-effects of such wrapped functions.

Most of node.js's standard functions expect an "error-first" style callback, and they also expect it to be at the end of the arguments list (aka "parameters first"). The default settings for ASQ.wrap(..) assume that sort of function signature.

However, you may need to use asynquence with other sorts of function signatures.

For example, some functions are the opposite in parameter order (aka "parameters last"), where the callback must be the first argument and any other parameters are passed after it. You can pass an options-object as the second parameter to ASQ.wrap(..) to signal alternative function signature behavior:

function doSomething(cb,p1,p2) {
    // do something with `p1` and `p2`, then later 
    // call `cb` as an error-first cb 
}
 
var better = ASQ.wrap(doSomething,{ params_last: true });
 
better("val 1","val 2")
.val(function(result){
    // result 
})
.or(function(err){
    // oops, `err` occurred! 
});

You also may want to specify a specific this binding to use with the underlying function/method call. You can do such with the normal JS .bind(..) utility, like ASQ.wrap( fn.bind(obj) ), but that gives you permanent hard-binding that can't be overriden, which may or may not be suitable.

If you want the more flexible "soft binding" (an alternate default this binding -- instead of window / global -- that can still be overriden), you can specify a this in the options object, like so:

function doSomething(cb) {
    cb(this.id);
}
 
var o1 = { id: 42 };
var o2 = { id: "foobar" };
 
var better = ASQ.wrap(doSomething,{ this: o1 });
 
// use `o1` as default soft-bound `this` 
better()
.val(function(id){
    id; // 42 
});
 
// `this` is still overridable 
better.call(o2)
.val(function(id){
    id; // "foobar" 
});

The complete list of options you can pass:

  • this: (default: { }) specifies a soft-binding (aka, alternate default) for this for the underlying function call being wrapped
  • params_first: (default: true) signals "parameters first" style signature
  • params_last: (default: false) signals "parameters last" style signature
  • errfcb: (default: true) signals "error-first" style callback expected
  • splitcb: (default: false) signals split success and error callbacks expected
  • simplecb: (default: false) signals simple (success-only) callback expected, which assumes an error is either passed opaquely (inaccessible to asynquence handling) to the callback in some way (which you must handle), or an error is thrown to be try..catch caught (which asynquence will handle)
  • gen: (default: false) signals that you've passed in a generator (ES6) to wrap (see below).

Obviously, there's several mutually exclusive combinations of these options which would be ambiguous, and are thus not allowed (will result in an immediately-thrown error upon calling wrap(..)), such as errfcb: false, params_first: true, params_last: true, etc. Just avoid these. Also, params_first: false is allowed, and just means params_last: true, but the latter is more preferable to the former.

If you pass gen:true as an option, it overrides all other options, and instead returns back a function that creates a new asynquence sequence with the runner(..) plugin (see below) wired to run the generator you passed in. Whatever arguments you pass to the wrapper will pass into the generator (accessed via token.messages -- again, see runner(..) plugin below).

var g = ASQ.wrap(function*(token){
    var x = 1;
    for (var i=0; i < token.messages.length; i++) {
        x = yield (* token.messages[i]);
    }
},{ gen:true });
 
g(2,3,4)
.val(function(msg){
    console.log(msg);   // 24 
});
 
g(2,3,4,5)
.val(function(msg){
    console.log(msg);   // 120 
});

The wrapper can be called one or many times, and each time will create and return a new sequence to run the generator.

  • any(..) is like gate(..), which waits for all segments to complete, except just one segment has to eventually succeed to proceed on the main sequence.

  • first(..) is like any(..), except as soon as any segment succeeds, the main sequence proceeds (ignoring subsequent results from other segments).

  • race(..) is like first(..), except the main sequence proceeds as soon as any segment completes (either success or failure).

  • last(..) is like any(..), except only the latest segment to complete successfully sends its message(s) along to the main sequence.

  • none(..) is the inverse of gate(..): the main sequence proceeds only if all the segments fail (with all segment error message(s) transposed as success message(s) and vice versa).

  • map(arr, eachFn) allows an asynchronous mapping of an array's values to another set of values. map(..) constructs a gate of segments, one for each item in arr. Each segment invokes eachFn(..) for the respective item in the array.

    eachFn(item, doneTrigger, ..) receives the respective item in the array and a doneTrigger to invoke with the new value to map back to that array position. Note: If multiple values are passed, that item's value will be an array (asynquence message wrapper) collection of the values passed.

    Just like with normal gates, eachFn(..) also receives any sequence messages passed forward from the previous main sequence step, such as eachFn(item, doneTrigger, msg1, msg2, ..). And, if any segment causes an error, the rest of the map(..) fails and the main sequence is flagged as error'd.

    If either arr, eachFn or both are not passed to map(..), it will attempt to pull them from the value-message stream it received from the previous step. Even if it does so, any subsequent messages in the stream will still pass on to the eachFn callback.

    The final success message from a map(..) sequence step is the newly constructed array of mapped values.

  • until(..) is like then(..), except it keeps re-trying until success or break() (for loop semantics) before the main sequence proceeds.

  • try(..) is like then(..), except it proceeds as success on the main sequence regardless of success/failure signal. If an error is caught, it's transposed as a special-format success message: { catch: ... }.

  • waterfall(..) is like a sequence of then(..)s, except the output from each step is tracked, and the aggregate of all steps' success messages thus far is the input messages to the next step (step 3 gets passed success output from both 1 and 2, etc). Thus, the final output success message(s) of waterfall(..) is the collection of all success messages from the waterfall's steps.

    An error anywhere along the waterfall behaves like an error in any sequence, immediately jumping to error state and aborting any further success progression.

pThen plugin provides a pThen(..) sequence method which is a cousin to the core built-in then(..), but it works instead with similar semantics/behavior to native ES6 Promises. In other words, if you prefer the way then(..) works with Promises over asynquence's then(..), just use pThen(..) instead. Note: pThen(..) doesn't have the extra sugar capabilities like then(..) does, such as being able to accept sequences as direct parameters, etc -- it does just what Promise then(..) does.

Also provided is pCatch(..) which has the same semantics as Promise catch(..): it's literally the same as calling pThen(null, ..).

Example:

ASQ(21)
.pThen(function(msg){
    return msg * 2;
})
.pThen(function(msg){
    return ASQ(function(done){
        setTimeout(function(){
            done(msg);
        },100);
    });
})
.pThen(function(msg){
    return new Promise(function(resolve,reject){
        setTimeout(function(){
            reject("Oops:" + msg);
        },100);
    });
})
// sequence is now in error state! 
.pThen( // or .pCatch(... 
    null,
    function(err) {
        return err.toUpperCase();
    }
)
// sequence no longer in error state since 
// `pThen`/`pCatch` registered an error handler 
// which handled the sequence error. 
.pThen(
    function(msg) {
        throw msg;
    }
)
// sequence is now in error state again! 
.pCatch( // or .pThen(null, ... 
    function(err){
        console.log(err); // "OOPS:42" 
        return "Cool";
    }
)
// sequence no longer in error state 
.val(function(msg){
    console.log(msg); // "Cool" 
})
.or(function(){}); // never called 

You'll notice differences from the asynquence core then(..), and how they match Promise then(..) behaviors instead:

  1. pThen(..) takes a success and/or failure handler (both optional), rather than multiple then handlers.
  2. The success handler is not provided the done trigger.
  3. Instead, you return either an immediate value (which then passes on to the next step at the next cycle), or a sequence or promise for a value, in which case the sequence/promise is "unwrapped", and procession occurs only after it resolves.
  4. If you register a failure handler via pThen(..) or pCatch(..), then it swallows (so you can handle) any sequence errors to that point, and essentially resets the sequence back to success state after it is passes.
  5. You can also return a value from a failure handler, which is the success message passed onto the next step. Note: Just like with Promises, a sequence/promise returned from an failure handler is not "unwrapped" -- it's just passed along as a normal value.

after plugin provides a sequence instance method after(..) which inserts a delay into a sequence at that step. The first parameter is a number of milliseconds to wait. (Optional) additional parameters provide sequence messages to pass along (overriding previous sequence messages). Otherwise, previous sequence messages pass-through the delay automatically.

after plugin also provides a static method version ASQ.after(..) which is the same as ASQ().after(..).

ASQ(42) // `42` gets discarded 
.after(500,"Hello","World!")
.val(function(msg1,msg2){
    console.log(msg1,msg2); // "Hello"  "World!" 
});
 
ASQ.after(500)
.val(function(){
    console.log("Hello World!");
});

failAfter plugin provides both the sequence method failAfter(..) and the static method ASQ.failAfter(..), which work exactly like the after plugin methods above, but result in failure rather than success.

The most common usage of the failAfter plugin is likely in combination with the race(..) plugin, to create "timeout" behavior:

// make a 2 sec timeout for some action 
ASQ()
.race(
    doSomethingAsync(..),
    ASQ.failAfter(2000,"Timeout!")
)
.val(function(){
    // success! 
})
.or(function(err){
    err; // "Timeout!" 
});

iterable-sequence plugin provides ASQ.iterable() for creating iterable sequences. See Iterable Sequences for more information, and examples: sync loop and async loop.

toPromise plugin provides .toPromise() (takes zero parameters) on a asynquence sequence instance's API, which allows you to vend/fork a new native Promise that's chained off of your sequence. Use this plugin if you need to send an asynquence sequence instance into some other utility which requires a thenable or standard Promises/A+ compliant promise.

Note: The vended promise is forked off the sequence, leaving the original sequence intact, to be continuable as normal. The message(s) (both success and error) from the chain are passed along to the promise, but they are also retained in the sequence itself, as if the forked-off promise is ignored.

Example:

// make an asynquence sequence to use 
var sq = ASQ(function(done){
    setTimeout(function(){
        done(42); // send 42 along as success message 
    },100);
});
 
// fork and deal with the native promise 
sq.toPromise()
.then(
    // success 
    function(msg){
        console.log(msg); // 42 
    },
    // error 
    function(err){
        console.log(err);
    }
);
 
// also continue with the original sequence 
sq
.val(function(msg){
    console.log(msg); // 42 
});

The goal of asynquence is to provide everything you need for promises-based async flow control without you needing to expose and use native promises or other promise libraries/utilities. Theoretically, this plugin should only be used when asynquence is insufficient in some way. If you find yourself needing to regularly vend native promises from asynquence, perhaps asynquence needs to be extended to handle that use-case, so let us know!


If you're using asynquence in an older environment which doesn't have the native ES6 Promise built-in, but you still want to be able to use the .toPromise() utility, you need a Promise polyfill. There are plenty of choices out there, but a great one to consider is:

Native Promise Only

As long as either the native Promise is there, or that global has been spec-compliant polyfilled, this toPromise plugin can create promises off your asynquence sequences.


errfcb plugin provides errfcb() on the main sequence instance API. Calling errfcb() returns an "error-first" style (aka "node-style") callback that can be used with any method that expects such a callback.

If the "error-first" callback is then invoked with the first ("error") parameter set, the main sequence is flagged for error as usual. Otherwise, the main sequence proceeds as success. Messages sent to the callback are passed through to the main sequence as success/error as expected.

Example:

// node.js: fs.readFile wrapper 
function readFile(filename) {
    // setup an empty sequence (much like an empty 
    // promise) 
    var sq = ASQ();
 
    // call node.js `fs.readFile(..), but pass in 
    // an error-first callback that is automatically 
    // wired into a sequence 
    fs.readFile( filename, sq.errfcb() );
 
    // now, return our sequence/promise 
    return sq;
}
 
readFile("meaningoflife.txt")
.then(..)
..

runner(..) takes either an iterable-sequence or an ES6 generator function, which will be iterated through step-by-step. runner(..) will handle either asynquence sequences, standard promises/thenables, thunks (see "thunks" here), or immediate values as the yielded/returned values from the generator or iterable-sequence steps.

The generator/iterable-sequence will receive any value-messages from the previous sequence step (via the control token -- see CSP-style Concurrency below for explanation), and the final yielded/returned value will be passed along as the success message(s) to the next main sequence step. Error(s) if any will flag the main sequence as error, with error messages passed along as expected.

Using generators:

function thunkDouble(x) {
    return function thunk(cb) {
        setTimeout(function(){
            // cb is an error-first style callback 
            cb(null,* 2);
        },500);
    };
}
 
function promiseDouble(x) {
    // using ES6 `Promise`s 
    return new Promise(function(resolve,reject){
        setTimeout(function(){
            resolve(* 2);
        },500);
    });
}
 
function seqDouble(x) {
    return ASQ(function(done){
        setTimeout(function(){
            done(* 2);
        },500);
    });
}
 
ASQ(2)
.runner(function*(token){
    // extract message from control-token so 
    // we can operate on it 
    var x = token.messages[0];
 
    while (< 100) {
        if (< 10) {
            x = yield thunkDouble(x);
        }
        else if (< 40) {
            x = yield promiseDouble(x);
        }
        else {
            x = yield seqDouble(x);
        }
    }
})
.val(function(num){
    console.log(num); // 128 
});

Using iterable-sequences:

function thunkDouble(x) {
    return function thunk(cb) {
        setTimeout(function(){
            // cb is an error-first style callback 
            cb(null,* 2);
        },500);
    };
}
 
function promiseDouble(x) {
    // using ES6 `Promise`s 
    return new Promise(function(resolve,reject){
        setTimeout(function(){
            resolve(* 2);
        },500);
    });
}
 
function seqDouble(x) {
    return ASQ(function(done){
        setTimeout(function(){
            done(* 2);
        },500);
    });
}
 
ASQ(2)
.runner(
    ASQ.iterable()
    .then(function(token){
        // extract message from control-token so 
        // we can operate on it 
        return token.messages[0];
    })
    .then(thunkDouble)
    .then(promiseDouble)
    .then(seqDouble)
)
.val(function(num){
    console.log(num); // 16 
});

runner(..) can accept 2 or more generators (or iterable-sequences) that you can cooperatively interleave execution of, which lets you leverage a simple form of CSP-style coroutine concurrency (aka "cooperative multitasking").

Generators/iterable-sequences will receive a control token with a messages channel (.messages property is a simple array) to use for passing messages back and forth as the coroutines interleave.

If you yield (or return in the case of iterable-sequences) that control token back (or a sequence/promise that eventually produces it), then you will signal to transfer control to the next (round-robbin ordering style) generator/sequence in the concurrency-grouping.

Otherwise, yielding/returning of any other type of value, including a sequence/promise, will retain control with the current generator/iterator-step.

You can also call .add(..) on the control token to add one or more generators/iterable-sequences to the concurrency-grouping:

// promise to double `v` in 1000 ms 
function double(v) {
    return new Promise(function(resolve,reject){
        setTimeout(function(){
            resolve(* 2);
        },1000);
    });
}
 
function makeGen(x,y) {
    return function*(token){
        token.messages.push( yield double(x) );
        yield token;
        token.messages.push( yield double(y) );
    };
}
 
ASQ()
.runner(
    function*(token) {
        token.add(
            makeGen(10,20),
            makeGen(100,200)
        );
        while (token.messages.length < 4) {
            yield token;
        }
        yield token.messages;
    }
)
.val(function(msg){
    console.log(msg); // [ 20, 200, 40, 400 ] 
});

With both generators and iterable-sequences, the last final non- value that is yielded/returned from the concurrency-grouping run will be the forward-passed message(s) to the next step in your main asynquence chain.

If you want to pass on the channel messages from your generator run, end your last generator by yielding out the .messages property of the control token (see above snippet). Likewise with iterable-sequences, return the channel messages from the last iterable-sequence step.

To get a better sense of how this advanced functionality works, check out these examples:

If you've heard of go-style CSP concurrency, such as in Clojure's core.async, or in various JS ports such as @jlongster's js-csp fork (also, read his blog post) of ubolonton's js-csp, asynquence has a (nearly-identical) API emulation layer that you can drop on top of asynquence's CSP-flavored runner(..) mechanism described above to express channel-based concurrency.

For example:

ASQ()
.runner(
    ASQ.csp.go(function*(ch){
        yield ASQ.csp.put(ch,42);
    }),
    ASQ.csp.go(function*(ch){
        yield ASQ.csp.take( ASQ.csp.timeout(1000) );
        console.log( yield ASQ.csp.take(ch) ); // 42 
    })
)
.val(function(){
    console.log("all done");
});

To use the go-style API emulation layer, you'll need (at least) the iterable() and pThen()/pCatch() contrib plugins.

In the browser:

<script src="asq.js"></script>
<script src="contrib.js"></script>
<script src="asq-go-csp.js"></script>

In node:

var ASQ = require("asynquence");
 
require("asynquence-contrib");
require("asynquence-contrib/asq-go-csp.js");

go-style CSP can be a very powerful abstraction for certain concurrency tasks, so using this API emulation layer gives you even more choices for expressing and managing async flow control in your JS programs.

Consider this kind of ugly code:

$("#button").click(function(evt){
   ASQ(this.id)
   .then(..)
   .seq(..)
   .then(..)
   .val(..)
});

Each time the button is clicked, a new sequence is defined and executed to "react" to the event. But it's a little awkward and ugly that the sequence must be (re)defined each time, inside the event listener.

The react plugin separates the capabilities of listening for events and of responding to them, providing first-class syntactic support for the asynquence "reactive sequence" pattern, inspired by RxJS Reactive Observables. It essentially combines asynquence's flow-control with repeatable event handling.

  1. react(..) accepts a listener setup handler, which will receive a reactive trigger (called proceed in the snippet below) that event listener(s) "react" with by invoking. It will also receive a function you can call one or more times to register a teardown handler (to unbind event handlers, etc).

  2. The rest of the chain sets up a normal asynquence sequence, which will then be repeat-executed each time the reactive trigger is fired. The reactive sequence also has an added stop() method, which you can use to trigger any registered teardown handlers and stop all reactive sequence handling.

The react plugin reverses the paradigm of the first snippet, providing a way to specify the sequence externally and once, and have it be re-triggered each time an event fires.

var rsq = ASQ.react(
   // this listener setup handler will be called only once 
   function setup(proceed,registerTeardownHandler){
      // fire off a new sequence for each click 
      function handler(evt) {
         // we can call `proceed(..)` (or whatever you want 
         // to call the param!) every time our stream/event 
         // fires, instead of just once like normal promise 
         // resolution 
         proceed(this.id);
      }
 
      $("#button").click(handler);
 
      // register a handler to be called when tearing down 
      // the reactive sequence handling 
      registerTeardownHandler(function(){
         $("#button").unbind("click",handler);
      });
 
      // inside our `setup` handler, `this` will point to 
      // the reactive sequence, which has a `stop()` method 
      // that tears down the reactive sequence handling 
      EVTHUB.on("finish",this.stop);
   }
)
// each time our reactive event fires, 
// process the rest of this sequence 
.then(..)
.seq(..)
.then(..)
.val(..);
 
// later, to stop the reactive sequence handling: 
EVTHUB.on("totally-done",rsq.stop);

Inside the react(..) listener setup function, you can set up as many listeners for any kind of events (ajax, timers, click handlers, etc) as you want, and for each, all you need to do to fire off the sequence is call the proceed(..) (or whatever you want to name it!) callback. Whatever messages you pass to proceed(..) will pass along to the first step of the sequence instance.

The proceed function has two helpers on it for dealing with streams (particularly node streams): proceed.onStream(..) and proceed.unStream(..). onStream(..) takes one or more streams and subscribes the data and error events to call the proceed function. unStream(..) takes one or more streams to unsubscribe, so you would likely use it in a registered teardown handler. For example:

var rsq = ASQ.react(function(proceed,registerTeardownHandler){
    proceed.onStream( mydatastream );
 
    registerTeardownHandler(function(){
        proceed.unStream( mydatastream );
    });
})
.val(function(v){
    if (instanceof Error) throw v;
    // .. 
})
// .. 
.or(function(err){
    console.log(err);
});

For a more real-world type of example, see reactive sequences + gate(). Here's another example, which handles http request/response streams with reactive sequences.

In the browser, include the contrib.js file along with the asynquence library file (asq.js). Doing so automatically extends the API with the plugins.

In node.js, you install the asynquence-contrib package alongside the asynquence package. Note: The asynquence-contrib package will return the asynquence instance for you, so you technically only need this if using both:

// Note: requiring "asynquence" not strictly needed here, 
// since contrib will retrieve and return it automatically 
 
var ASQ = require("asynquence-contrib");

They can then be used together directly, like this:

ASQ()
.try(foo)
.until(bar)
.then(baz);

Note: If you load contrib bundle(s) that cannot find a peer asynquence top-level package to load and use, a dependency-injection function is instead returned, which expects to be called with either an asynquence instance, or a relative path specifying where to load it.

There is a utility provided to bundle the contrib plugins.

bundle.js usage:
  bundle.js [ {OPTION} .. ] [ {PLUGIN-NAME} .. ]
 
--help                    prints this help
--wrapper=filename        wrapper filename ("contrib-wrapper.js")
--bundle=filename         bundle filename ("contrib.src.js")
--min-bundle=filename     minified-bundle filename ("contrib.js")
--exclude={PLUGIN-NAME}   exclude a plugin from bundling
 
If you don't pass any {PLUGIN-NAME} parameters, all available plugins
(except any that are --exclude omitted) will be bundled.
 
If you pass one or more {PLUGIN-NAME} parameters, only the ones
specified (except any that are --exclude omitted) will be bundled.

bundle.js by default builds the unminified bundle contrib.src.js, and then builds (minifies) contrib.js. The recommended way to invoke this utility is via npm:

npm run-script bundle

By default, the build includes all the contrib/plugin.*.js plugins. But, you can manually specify which plugins you want, by name. For example, to bundle only the any, none, and try plugins:

./bundle.js any none try

Note: npm run-script .. doesn't currently support passing any extra command line parameters, so you must use ./bundle.js instead of npm run-script bundle if you want to specify parameters to the bundle script.

By passing option parameters to the bundle script, you can override the default filenames used for the contrib plugin wrapper (--wrapper=..), bundle (--bundle=..), and minified-bundle (--min-bundle=..). These options are useful for creating multiple variations of the plugin bundle.

The code and all the documentation, unless otherwise noted, are released under the MIT license.

http://getify.mit-license.org/