async-csp
Communicating sequential processes for use with ES2016's async/await syntax.
Here's GoLang's ping/pong example in async-csp
flavor:
{ return } { while true let ball = await table; if ball === ChannelDONE console; break; ballhits++; console; await ; await table; } { console; let table = ; ; ; console; let ball = hits: 0; await table; await ; console; table; await table; console; console;}
Sometimes the output of this example is
Opening ping-pong channel!
Serving ball...
ping! Hits: 1
pong! Hits: 2
ping! Hits: 3
pong! Hits: 4
ping! Hits: 5
pong! Hits: 6
ping! Hits: 7
pong! Hits: 8
ping! Hits: 9
Closing ping-pong channel...
pong: table's gone!
Channel is fully closed!
Ball was hit 9 times!
ping: table's gone!
and sometimes it's
Opening ping-pong channel!
Serving ball...
ping! Hits: 1
pong! Hits: 2
ping! Hits: 3
pong! Hits: 4
ping! Hits: 5
pong! Hits: 6
ping! Hits: 7
pong! Hits: 8
ping! Hits: 9
pong! Hits: 10
Closing ping-pong channel...
ping: table's gone!
Channel is fully closed!
Ball was hit 10 times!
pong: table's gone!
Sometimes the ball is hit 9 times, and sometimes 10! This is due to the nature of asynchronicity which is nicely depicted in this example.
Installation
npm install async-csp
Default Task
- Install node.js
- Clone the async-csp project
- Run
npm install
- Run
gulp
- Executes tests
- Cleans dist
- Lints source
- Builds source
- Watches source and tests
Examples
Examples can be found here.
To run any example, make sure the default task has been successfully run once (or at least npm install
), then run node index.js
from the root folder of the example.
Usage
Note: All of the code pieces below are assumed to be executed from an async
context, so await
is available at the base level. To read more about these methods, see this proposal for async/await in ES7.
Data Flow
A Channel
is a container which makes use of Promises
to handle the incoming and outgoing flow of data.
To put a value on a Channel
use Channel#put()
, and to take a value from the channel use Channel#take()
.
By default, the promise returned from Channel#put()
will not resolve until its value is taken from the channel, and the promise returned from Channel#take()
will not resolve until a value can be taken from the channel.
; let channel = ; { await ch; // resolves when the first ch.take() is executed await ch; // resolves when the second ch.take() is executed await ch; // resolves when the third ch.take() is executed} { console; // resolves to 1, from the first ch.put() console; // resolves to 2, from the second ch.put() console; // resolves to 3, from the third ch.put()} ;;
Buffering
A Channel
can be created with a buffer for receiving puts.
Essentially, this means a put can resolve while space is available on the buffer, even if no take is waiting to receive a value.
As soon as the buffer becomes full, put will begin blocking again until a take clears a space from the buffer.
To create a Channel
with a buffer, pass in a Number
as the first argument to the constructor.
; let channel = 2; // buffer size of 2 { await ch; //=> this can resolve immediately, taking one space on the buffer console; // fires immediately await ch; //=> this can also resolve immediately, taking the second space on the buffer console; // also fires immediately await ch; //=> buffer is full! this will block until another process takes a value from the Channel console; // fires after the unblock!} { console; //=> resolves to 1, clears a space on the buffer and allows the blocked ch.put(3) to also resolve console; //=> resolves to 2 console; //=> resolves to 3} // execute the puts right away; // use a helper method to wait for 1 second// to help show the effects of blockingawait ; // after 1 second, start executing takes;
Non-blocking puts
A common use for a Channel
requires data to be input from a non async context, or without waiting for the put to resolve.
In this scenario, do not await the result of Channel#put()
.
let ch = ; // non-blocking puts, don't use `await`ch;ch;ch; console; //=> 1console; //=> 2console; //=> 3
Transforming
When constructing a Channel
, you can pass in a callback to transform values as they are taken.
let ch = x * 2; ch;ch;ch; console; //=> 2console; //=> 4console; //=> 6
If values should be dropped from the Channel
, simply return undefined
from the transform callback.
let ch = { if x > 2 return x;}; ch;ch;ch;ch; console; //=> 3console; //=> 4
If a transform needs to expand a single value into multiple values, use the push
parameter with the transform.
Note that when using this callback style, all values must be sent through push
.
Any value returned from the transform callback will be ignored
when the provided transformer has more than one parameter defined.
let ch = { ; ;}; ch;ch; console; //=> 1console; //=> 2console; //=> 3console; //=> 4
If the transform needs to work asynchronously, there are a few ways to accomplish this.
The first is to use an async callback.
let ch = async { await ; return x;}; ch;ch; console; //=> 1console; //=> 2
The second way to use an asynchronous transform is by passing in an async callback with a parameter length of 2.
Similar to the non-async callback with a parameter length of 2, all values must be sent through push
,
and returned values will be ignored.
let ch = asyncx push await ; ; await ; ;; ch;ch; console; //=> 1console; //=> 2console; //=> 3console; //=> 4
The final way to use an asynchronous transform is with a three-parameter callback. To signify that the transform has completed, execute the third argument.
let ch = { ; ;}; ch;ch; console; //=> 1console; //=> 2console; //=> 3console; //=> 4
One final note: Using a transform does not prevent you from simultaneously using a buffer.
To use a transform with a buffered Channel
, pass in the buffer size as the first argument, and the transform as the second.
let ch = 2 x + 1; // note that puts will be resolved immediately, since we have space on the bufferawait ch;await ch; console; //=> 2console; //=> 4
Channel#pipe()
Similarly to Streams
, Channels
can be piped from one to another.
let ch1 = ;let ch2 = ;let ch3 = ; ch1;/* +---+ |ch1| +---+ | V +---+ |ch2| +---+ | V +---+ |ch3| +---+*/ ch1;ch1;ch1; console; //=> 1console; //=> 2console; //=> 3
A Channel
can be piped to multiple destinations.
In this case, downstream Channels
will receive every value from upstream.
let ch1 = ;let ch2 = ;let ch3 = ; ch1; // or `ch1.pipe(ch2); ch1.pipe(ch3);`/* +---+ +-|ch1|-+ | +---+ | | | V V +---+ +---+ |ch2| |ch3| +---+ +---+*/ ch1;ch1;ch1; // 1 is taken from ch1 and put on ch2 and ch2 console; //=> 1console; //=> 1 // 2 is taken from ch1 and put on ch2 and ch3 console; //=> 2console; //=> 2 // 3 is taken from ch1 and put on ch2 and ch3 console; //=> 3console; //=> 3
Also take note that if one downstream Channel
is blocked from a currently unresolved Channel#put()
(buffered or non-buffered), then the entire pipe will be blocked.
In the example above, an attempt to take all 3 values from ch2
before taking any values from ch3
would have resulted in deadlock.
Finally, any piped Channel
will also execute transforms.
let ch1 = x + 2;let ch2 = x;let ch3 = x: x ; ch1; ch1;ch1;ch1; console; //=> { x: '3' }console; //=> { x: '4' }console; //=> { x: '5' }
Channel.pipeline()
Channel.pipeline()
is a helper method for creating piped channels from any number of callbacks.
Callbacks can be provided either as separate arguments, or contained in an array as the first argument.
Channel.pipeline()
will return an array containing the first and the last Channel
in the pipeline.
let ch1 ch3 = Channel; ch1;ch1;ch1; console; //=> { x: '3' }console; //=> { x: '4' }console; //=> { x: '5' }
Channel#unpipe()
If a Channel
should be taken out of an existing pipe, use Channel#unpipe()
.
let ch1 = ;let ch2 = ;let ch3 = ; ch1; ch1;console; //=> 1 // now take ch2 out of the pipech1; ch1;console; //=> 2, note that we took from ch1 // note that ch2 is still piping to ch3ch2;console; //=> 3
Channel#merge()
Channel.merge()
is a helper method for piping multiple Channels
into a single, new Channel
.
let ch1 = ;let ch2 = ;let ch3 = ch1; // or, `ch3 = Channel.merge(ch1, ch2)` ch1;ch2; console; //=> 1console; //=> 2
Channel#close()
A Channel
has 3 states: open, closed, and ended. An open Channel
can be written to, a closed Channel
will not accept any new values but may be non-empty, and an ended Channel
is both closed and empty.
To signify that a Channel
should be done accepting new values, execute Channel#close()
. Data can still be taken from the channel after that point, but no more values can be added.
let ch1 = ; ch1;ch1; ch1; ch1; // resolves immediately with value of Channel.DONE console; //=> 1console; //=> 2console; //=> Channel.DONE
If Channels
are piped together, and you want the entire pipeline to close when possible, simply pass true
as an argument to Channel#close()
.
let ch1 = ;let ch2 = ;ch1; ch1;ch1; ch1; console; //=> 1console; //=> 2console; //=> Channel.DONE
Channel#done()
In order to wait for a channel to be ended (closed and empty), await the resolution of done
.
let ch = ; ch;ch;ch; let arr = ;async await ; arr; await ; arr;; await ch; // will not resolve until the async IIFE takes both values from the channelconsole; //=> [ 1, 2 ]
Channel#tail()
While manually appending values to a Channel
can be accomplished,
it often becomes significantly more difficult
when items such as pipes and asynchronous transforms are in play.
For simplicity, Channel#tail()
is provided as an alternative method for
providing values to Channel#take()
only after the Channel
is closed
and all existing Channel#put()
s have been resolved.
let ch = ; ch;ch;ch;ch;ch; console; //=> 1console; //=> 2console; //=> 3console; //=> 4
Note that when a Channel
has a transform, any values provided through Channel#tail()
will also use that transform.
let ch = x + 2; ch;ch;ch;ch;ch; console; //=> 3console; //=> 4console; //=> 5console; //=> 6
Channel#consume()
If you would like to execute a callback as soon as values can be taken from the Channel
,
you may add a consumer by using Channel#consume()
.
let ch = ;ch; await ch;await ch;await ch;await ch; // console logs//=> 1//=> 2//=> 3//=> 4
Channel#consume()
can also be handled asynchronously, and will not attempt to queue up another Channel#take()
until the consumer callback has completed running.
let ch = ;let arr = ;ch; await ch;await ch;await ch;await ch; // console logs, once a second//=> 1//=> 2//=> 3//=> 4
Channel#produce()
Similar to Channel#consume()
, Channel#produce()
will put returned values
onto the Channel
as soon as space becomes available.
let ch = ;let counter = 0;ch; console; //=> 1console; //=> 2console; //=> 3console; //=> 4
As with Channel#consume()
, Channel#produce()
can also work asynchronously.
let ch = ;let counter = 0;ch; console; //=> 1, after 1 secondconsole; //=> 2, after 2 secondsconsole; //=> 3, after 3 secondsconsole; //=> 4, after 4 seconds
Channel.from()
If you have an iterable item which you would like to convert into a Channel
,
use Channel.from()
to construct a Channel
from that iterable.
let arr = 1 2 3 ;let ch = Channel; console; //=> 1console; //=> 2console; //=> 3
Note that in this case, a buffer is created with the size of the iterable,
all values are placed directly onto the buffer, and the Channel
is marked as closed,
which will include any attached downstream pipes.
If the channel or any downstream pipes should remain open to continue receiving puts,
pass in a true
as the second argument.
let arr = 1 2 3 ;let ch = Channel;ch;ch; console; //=> 1console; //=> 2console; //=> 3console; //=> 4
License
All code released under the MIT license.