A small CPS-style utility library for node.js
This module implements an asynchronous job queueing system with the
WorkingQueue class. Jobs are processed concurrently up to a given degree of "concurrency" ; of course, if this degree is 1, then jobs are processed sequentially.
The basic use case of this module is when you want to perform a bunch (hundreds or thousands) of I/O related tasks, for instance HTTP requests. In order to play nice with others and make sure the I/O stack (file descriptors, TCP/IP stack etc.) won't be submerged by thousands of concurrent requests, you have to put an artificial limit on the requests that are launched concurrently. When this limit is reached, jobs have to be queued until some other job is finished. And that's exactly what
WorkingQueue is for !
This class is instanciated with a single parameter : the
concurrency level of the queue. Again, if the concurrency level is 1, then it means that all jobs will be processed sequentially.
var WorkingQueue = require'capisce'WorkingQueue;var queue = 16;
You can then launch jobs using the
perform() method. If the current concurrency limit has not been reached, then the job will be scheduled immediatly. Otherwise, it is queued for later execution.
Jobs are simple functions that are passed a very important parameter : the
over() function. The job MUST call the over function at the end of its process to signal the
WorkingQueue that it is, well, over.
queueperformconsole.log"Hello, world !";over;;
over() function can be passed around inside your job. In fact it's the only way to perform interesting things : since I/O are asynchronous, you have to call over once the I/O request is over, that is to say in an event handler or completion callback.
var fs = require'fs';queueperformconsole.log"Reading file...";fsreadFile'README.md'console.log"Over !";iferrconsole.errorerr;elsestdoutwriteresult;over;;;
Of course you can name the over function any way you want. Other similar libraries like to call it
Before capisce 0.4.0, jobs where function that could only take one parameter, the
over() function. This forced any job parameters to be passed through the closure mechanism, which may have undesirable memory or performance downsides.
From capisce 0.4.0, you can pass additional arguments to the
perform() call and they will be passed right along to your job, before the over function. Internally the data stored in the queue is
[job, arg1, arg2...] so no surprises regarding memory usage.
Here is a sample of parameter passing :
// Note how the over function is passed as the last parameterconsole.log'' + word1 + ', ' + word2 + ' !';over;queueperformmyJob 'Hello' 'world';queueperformmyJob 'Howdy' 'pardner';
As a shortcut, from capisce 0.6.0, you can process all elements in collections using
var queue = 16;var result = 0;var list = 1 1 2 3 5 8 13;ifindex % 2 == 0result += element + k;over;queueprocessListadder list 5;queueonceDoneassertequal1 + 5 + 2 + 5 + 5 + 5 + 13 + 5 result;done;;
var queue = 16;var list = 'Nicolas':30 'Loïc':3 'Lachlan':0.75;var total = 0.0;var count = 0;total += value + fvalue;count += 1;over;;return total / count;;var mean = ;queueprocessObjectmeanprocess list return value + 5; ;queueonceDoneassertequal30 + 30 + 5 + 3 + 3 + 5 + 0.75 + 0.75 + 5 / 3 meanresult;done;;
When queuing a bunch of jobs, it is often required to wait for all jobs to complete before continuing a process. For that you use the
onceDone() method :
var duration = MathfloorMathrandom * 1000;console.log"Starting "+name+" for "+duration+"ms";setTimeoutconsole.logname + " over, duration="+duration+"ms";over;duration;var i;fori=0;i<1000;i++queueperformmyJob "job-"+i;queueonceDoneconsole.log"All done !";;
onceDone() method can be called multiple times to register multiple handlers, and the handlers will be called in the same order they were added. Maybe I should have used the
EventEmitter pattern for this.
From 0.4.1 the situation when
onceDone callbacks are called is more precise.
onceDone callbacks will be called :
- when the last job from the queue is over
- when you call
doneAddingJobs()and no job was performed since the last
This is required to handle the case when a queue may or may not receive jobs, and you want cleanup callbacks to be called in both situations. In this case you would do :
var duration = MathfloorMathrandom * 1000;console.log"Starting "+name+" for "+duration+"ms";setTimeoutconsole.logname + " over, duration="+duration+"ms";over;duration;var i;// In some cases you can have no queued jobsfori=0;i<Mathrandom10;i++queueperformmyJob "job-"+i;queueonceDoneconsole.log"All done with "+i+" jobs !";;// This does nothing if jobs where added to the queue// If no jobs where added, the onceDone callbacks are calledqueuedoneAddingJobs;
doneAddingJobs() is not mandatory : it's just needed if you want to make sure the
onceDone callbacks are called even if no job was effectively done.
As the name implies,
onceDone callbacks are called only once, and forgotten immediatly after. This is an important change in capisce 0.5.0 ; previously those callbacks where kept and repeatedly called whenever the done situation was encountered. This created a lot of opportunities for memory leaks. If you want to reproduce the same behaviour as before, re-register your callback using
onceDone from within your callback, capisce supports that.
Since capisce 0.2.0, if you want to fill in the queue first, then launch jobs later, you can use the hold and go methods :
queuehold; // Hold queue processingfori=0;i<1000;i++queueperformmyJob "job-"+i;queuego; // Resume queue processing
Since capisce 0.3.0, you can use the
wait() method as a convenient way to include delays into job execution.
var queue = 1; // Basically, a sequencequeueperformconsole.log"Waiting 5 seconds...";over;wait5000performconsole.log"done !";over;;
Of course the above example would be useless with some concurrency in the queue. If you want concurrency, you can pass a job parameter to
var queue = 16;queueperformconsole.log"First job done";over;wait5000console.log"Second job started after 5 seconds";over;;
This is just a wrapper around
WorkingQueue that do the very common task of collecting result of each job. When using
CollectingWorkingQueue, the over function takes the
err, result of the job as parameters, and the
wellDone handler receive the array of job results (as
[jobId, err, result] sub-arrays). It is your choice to sort this array if you want to have results in the same orders the jobs where submitted.
Note : before version 0.4.5, the sample below had a bug.
var queue2 = 16;returnvar duration = MathfloorMathrandom * 1000;console.log"Starting "+name+" for "+duration+"ms";setTimeoutconsole.logname + " over, duration="+duration+"ms";overnull "result-"+name;duration;;var i;fori=0;i<1000;i++queueperformmyJob"job-"+i;queueonceDoneconsole.log"All done !";console.log"Before sorting : "console.logresults0;console.logresults999;resultssortconsole.log"After sorting : "console.logresults0;console.logresults999;;
Since capisce 0.4.5, you can pass parameters to jobs, just like you would do with a standard
WorkingQueue. Once again, this saves you from using function builders (however there are closure built behind the scene).
var queue2 = 16;var duration = MathfloorMathrandom * 1000;console.log"Starting "+name+" for "+duration+"ms";setTimeoutconsole.logname + " over, duration="+duration+"ms";overnull "result-"+name;duration;var i;fori=0;i<1000;i++queueperformmyJob "job-"+i;queueonceDoneconsole.log"All done !";console.log"Before sorting : "console.logresults0;console.logresults999;resultssortconsole.log"After sorting : "console.logresults0;console.logresults999;;
Also since capisce 0.4.5, you can call
CollectingWorkingQueue.go() just like with
capisce.sequence() can be used as a shorcut :
// Those three block codes are equivalent :// Basic versionvar queue = 1;queueperformjob1;queueperformjob2;queueperformjob3;// Using capisce.sequence() :capiscesequenceperformjob1thenjob2thenjob3;// capisce.sequence() accepts a job as parameter :capiscesequencejob1thenjob2thenjob3;// capisce.sequence() also accepts job parameters.// By the way, perform is good to use, toocapiscesequencejob1 param1thenjob2performjob3 param2;
then() doesn't accept job parameters like
perform(). This is due to a feature that I'd rather remove in the near future, that allows
then() to create concurrent blocks within a sequence.
- 0.6.2 (2012-11-02) : Fixed a bug that broke
onceDonesemantics when calling
processObjecton empty lists / objects.
- 0.6.1 (2012-10-29) :
processDictionarysupport extra callback arguments.
- 0.6.0 (2012-10-29) : Added
- 0.5.0 (2012-10-29) :
whenDoneis deprecated, use
onceDonecallbacks are called only once.
- 0.4.5 (2012-08-28) :
CollectingWorkingQueue.perform()now accepts parameters for the job, just like
WorkingQueue.go(). Fixed a bug wherein the (optional) job passed to
sequence()was not scheduled.
- 0.4.4 (2012-08-28) : wrote proper unit tests using mocha (
npm testto launch them).
- 0.4.3 (2012-05-03) : with the help of @penartur, fixed a problem where a single worker was launched after a
- 0.4.2 (2012-03-16) : fixed a problem with
- 0.4.1 (2012-03-15) : clarified behavior of
- 0.4.0 (2012-02-15) :
WorkingQueue.perform()now accepts extra parameters that are passed to the job when it is scheduled.
- 0.3.1 (2012-02-12) : new behavior for
WorkingQueue.whenDone(), not so satisfying.
- 0.3.0 (2012-02-10) : Added the
- 0.2.0 (2012-02-10) : Added
- 0.1.0 (2012-02-09) : Initial version.