ppgs-pg

0.0.4 • Public • Published

PPG's Promise Generator Library

Queue

A CSP style queue implemented with Promises.

Creating queues

var Queue = require('ppgs-pg').Channel;
var queue = new Queue(5); // buffer capacity 5
var queueWithUnlimitedCapacity = new Queue(); // unlimited buffer capacity
                                              // not recommended

Putting items

queue.put(x).then(() => console.log("put"));

Getting items

queue.get().then((x) => console.log(x));

Getting current buffer size

queue.size()

Check if buffer is full or empty

queue.isFull();
queue.isEmpty();

Channel

A channel is essentially a queue with 0 capacity. Every put() only resolves when get() is called, and vise versa.


Pipeline

A pipeline implemenation that takes a series of asynchronous processes and run them concurrently.

Rationale

Consider the case where multiple asynchronous functions that can be run concurrently need to be run on a number of inputs to complete a task.

co(function *() {
    ...
    for (var w of workSet) {
        yield Promise.all([
            asyncFunc1(w),
            asyncFunc2(w),
            asyncFunc3(w),
            asyncFunc4(w),
            asyncFunc5(w),
            ...
        ]);
    }
    ...
});

If these asynchronous functions take different times to complete, the total time will be determined by the slowest function.

Instead of running all of the functions on a single input concurrently, a pipeline runs all of the functions on the same input sequentially, and different inputs concurrently. This has several significant advantages compared to the method above.

  • Each function only waits on the previous function with the help of buffers.
  • Multiple instances of the same function can be run to "speed up" slow ones.
  • Even if the functions must be run sequentially, pipeline can also provide concurrency.

Creating Pipelines

var Pipeline = require('ppgs-pg').Pipeline;
var pipeline = new Pipeline();

Adding Processes

Adding a generator

pipeline.addStation({
    f: function *() {
    }
});

Adding a plain old function that returns promises

pipeline.addStation({
    f: function () {
        return new Promise((resolve, reject) => {});
    }
});

Setting minWorkers and maxWorkers. These controls the minimum and maximum number of concurrent invocation of your process

pipeline.addStation({
    f: function *() {
    },
    minWorkers: 2,
    maxWorkers: 5
});

Putting Data

pipeline.put(x).then(() => {});

Getting Data

pipeline.get().then((x) => {});

Handling Errors

These have the same effect.

pipeline.addStation({
    f: function *() {
        throw 'my error';
    }
});
pipeline.addStation({
    f: function *() {
        yield Promise.reject('my error');
    }
});
pipeline.addStation({
    f: function () {
        return Promise.reject('my error');
    }
});
pipeline.addStation({
    f: function () {
        return new Promise((resolve, reject) => {
            reject('my error');
        })
    }
});

Catching the error.

pipeline.get().catch((e) => { e == 'my error' });

A Helper Function

A helper function that converts an array of functions/generator into a pipeline

var pipelinefy = require('ppgs-pg').pipelinefy;
 
var pipeline = pipelinefy(
    // an array of functions/generators/station configuration objects
    [
        function (x) { /* ...  */ },
        function *(x) { /* ... */ },
        { // this has the same structure as Pipeline#addStation(opts)
            f: function *() { /* ...  */ },
            minWorker: 2,
            maxWorker: 10
        }
    ],
    // pipeline configurations & default station parameters
    {
        feed: new Queue(1),
        queueCapacity: 5,
        minWorker: 1,
        maxWorker: 1
    });
 
// use pipeline

Benchmarking

npm run pipeline-benchmark

An Example

pipeline-example.js


Select

A function to select available channels and queues. It mimics the behavior of the select statement in Go, except that it can only select reading.

An example

This is an example that produces the first 10 elements in the Fibonacci sequence.

"use strict";
 
var { Queue, Channel, select } = require('ppgs-pg');
var co = require('co');
 
var c = new Channel,
    next = new Channel,
    quit = new Channel;
 
co(function *() {
    var n0 = 0, n1 = 1;
    var f = select([next, quit]);
    while (true) {
        var { s, x } = yield f();
        if (=== next) {
            yield c.put(n1);
            var tmp = n1;
            n1 = n1 + n0;
            n0 = tmp;
        } else if (=== quit) {
            console.log('quit');
            break;
        }
    }
});
 
co(function *() {
    for (var i = 0; i < 10; ++i) {
        yield next.put(0);
    }
    yield quit.put(0);
});
 
co(function *() {
    while (true) {
        console.log(yield c.get());
    }
});

Another Example

readers-writer-lock-example.js is another example that implements a readers-writer lock using channels and select.


Tests

npm test

Versions

Current Tags

  • Version
    Downloads (Last 7 Days)
    • Tag
  • 0.0.4
    0
    • latest

Version History

  • Version
    Downloads (Last 7 Days)
    • Published
  • 0.0.4
    0
  • 0.0.3
    0
  • 0.0.2
    0
  • 0.0.1
    0

Package Sidebar

Install

npm i ppgs-pg

Weekly Downloads

0

Version

0.0.4

License

MIT

Last publish

Collaborators

  • panbotao