CSP-style channel library using ES7 async/await keywords
CSP-style channel library using ES7 async/await keywords.
npm install medium
Channels are queues, you can
put things onto them and
take things off, in a first-in-first-out way. Channels can be closed, after which, they will not receive or deliver values.
take are both asynchronous actions, and return promises.
put promises simply resolve to
true if it was able to successfully add its value to the channel, or
false if the channel is closed.
take promises resolve either to whatever was next in the channel queue, or to the constant
CLOSED if the channel is closed. For example:
let ch1 = chanputch1 1takech1then::console.log// LOGS: 1takech1then::console.logputch1 2// LOGS: 2takech1then::console.logclosech1// LOGS: CLOSEDputch1 3then::console.log// LOGS: false
The strategy with which a channel handles an excess of
puts is implemented as a
buffer. The default channel does not allow for any buffered values, so if you
put without a waiting
take for the value, it will not resolve the
put until a corresponding
take is added. For example:
let ch1 = chanputch1 1then => console.log'put 1'putch1 2then => console.log'put 2'takech1// LOGS: 'put 1'takech1// LOGS: 'put 2'
An example of a different buffer would be a "fixed" buffer, which has N slots for
put values to wait for a
take. For example:
let ch = chanlet fixedCh = chanbuffersfixed2 // or shortcut with chan(2)putch 1then::console.log// LOGS NOTHINGputfixedCh 1then => console.log'put 1'// LOGS: put 1putfixedCh 2then => console.log'put 2'// LOGS: put 2putfixedCh 3then => console.log'put 3'// LOGS NOTHINGtakefixedChthen::console.log// LOGS: 1// LOGS: put 3
The other included buffers are, "dropping", which allows N puts, then begins "dropping" them, causing the put to resolve successfully but the value is not added to the channel, and "sliding", which allows N puts, then begins shifting the buffer, dropping the oldest buffered
put value and adding the newest to the other end.
let ch = chanbuffersdropping2putch 1putch 2putch 3 // this is droppedtakechthen::console.log// LOGS: 1takechthen::console.log// LOGS: 2takechthen::console.log// LOGS NOTHINGputch 3// LOGS: 3
let ch = chanbufferssliding2putch 1putch 2putch 3 // this causes the put of 1 to be droppedtakechthen::console.log// LOGS: 2takechthen::console.log// LOGS: 3
Of course, you may need to filter or modify values as they are put onto the channel. Transducers are the best option here, and are fully supported.
import t from 'transducers-js'let shouts = channull tmapstr => `!!!`putshouts 'HAI'takeshoutsthen::console.log// LOGS: 'HAI!!!'
Things get much more interesting though when we use async/await to better coordinate our channels.
import t from 'transducers-js'import chan put take sleep go from '../lib/index'let numbers = chanlet oddNumbers = channull tfiltern => n % 2goasync =>while trueconsole.log'an odd number: ' await takeoddNumbersgoasync =>while truelet n = await numbers // awaiting a channel is an implied "take"await putoddNumbers ngoasync =>while truelet randomNum = MathfloorMathrandom * 100await putnumbers randomNumawait sleep1000
So we have a number being generated every second, and put onto the
numbers channel. This is consumed and tested for "oddness", and if it passes, then it is put onto the
oddNumbers channel where it is simply console.log'ed.
What if we want to keep track of the percent odd vs. even? We can put a bit of local state in the process that checks for oddness. However, mutating state sucks, so, we use the function
repeat to both act as a
while loop and manage state immutably!
import chan put take sleep go repeat from '../lib/index'let numbers = chanlet oddNumbers = chanlet stats = changoasync =>while trueconsole.log'an odd number: ' await oddNumbersgoasync =>while trueconsole.log'Stats: ' await statsgoasync =>repeatasync total odds =>putstats `% odd numbers`let n = await numbersif n % 2putoddNumbers nreturn total: total + 1 odds: odds + 1elsereturn total: total + 1 oddstotal: 0 odds: 0goasync =>while truelet randomNum = MathfloorMathrandom * 100await putnumbers randomNumawait sleep1000
And now we see that, indeed, our universe isn't broken and over time our cumalitive chance of an odd number closes in on 50%.
We can even take our
repeat function one step further, and use
repeatTake, since that is exactly what we are doing.
goasync =>repeatTakenumbers async n total odds =>putstats `% odd numbers`if n % 2putoddNumbers nreturn total: total + 1 odds: odds + 1elsereturn total: total + 1 oddstotal: 0 odds: 0
So we just change the signature a bit, and our local "repeat" state is passed as the second argument instead of the first.
import chan go put close take sleep repeatTake CLOSED from '../lib/index'let player = async name table =>repeatTaketable async ball state =>if ball === CLOSEDconsole.log` hit the ball times!`return false // returning false is how you BREAK a repeat or repeatTakeconsole.logname ballhitsawait sleep100puttable ball// return a value to store it as state, and access it as the second argument abovereturn hitCount: statehitCount + 1hitCount: 0goasync =>let table = chanplayer'ping' tableplayer'pong' tableputtable hits: 0await sleep1000closetable
More documentation is coming, but the core functionality is ~160LOC, so it should just take a single cup of coffee to read through. I wanted to be sure that the API was built deliberately, and not just a port from some previous effort.
Creates a channel. All arguments are optional.
numOfBuffer - Any number or buffer. A number is a shortcut for
xducer - a transducer to process/filter values with.
Puts a value onto a channel. Returned promise resolves to true if successful, or false if the channel is closed.
Takes a value from a channel. Returned Promise resolves to taken value or CLOSED constant if the channel is closed.
Immediately invokes (and returns) given function.
Creates a promise that will resolve successfully after
A constant, which all takes on a closed channel receive instead of a value.
Closes a channel. This causes:
Makes a new channel, same as the old channel.
alts in Clojure's
ports can be a channel to take from, a promise to resolve, or an array
to put data onto a channel, like
[ theChannel, valueToPut ].
If none of them have a pending value, it will resolve with whichever channel receives a value next. If one of the channels has a pending value already, it will simply resolve to that. If more than one channel has a pending value, it selects one in a non-deterministic fashion.
Always resolves with a double of
[ theResolvedValue, theSourceChannel ].
All non-winning actions will be canceled so that their data does not go missing.
I don't love
while loops, so I use this instead.
As a bonus, you can track state without mutations! Return a value other than false, and it will be available as the argument to your callback async function. Pass in a
seed value as the second argument to repeat.
This is jsut like
repeat above, except that before it repeats, it waits for a successful
take on the given channel. Then it passes this taken value in as the first argument, with any local state being passed as the second argument.
See the ping/pong example above to see this in action.
Creates a new channel that will receive all puts to the received channels.
No buffer space. The default choice for when first argument to
chan is falsy.
Buffer has space of
num. Any extra
puts are parked.
Buffer simply slides across pending puts as a window of
num width. So, oldest puts are dropped as new ones are added.
Buffer drops, and resolves, any extra puts beyond