Pipe
Pipe can connect both io and calculate work together, see example blow
install
yarn add @swnb/pipe
example
import { Pipe } from './pipe'
// create pipe 1 with capacity 10
const p1 = Pipe.new<number>(10)
// create pipe 2 with capacity 10
const p2 = Pipe.new<number>(10)
setInterval(() => {
p1.write(1) // p1 receive number 1 every 10 ms
}, 10)
// reducer1 is an accumulator , reducer can be Promise
const reducer1 = (acc: number, cur: number) => acc + cur
// p1 pipe to p2 with reducer1
// this create stream emit every 10 ms
p1.reduce(reducer1, 0).pipeTo(p2)
;(async () => {
// loop read result
for (;;) {
const result = await p2.read()
console.log(result.value) // print 1,2,3,4,5,6,.....
}
})()
p1 create number 1
every 10 ms , p1 through reducer (acc,cur)=> acc+cur
,first value zero) pipe to p2
p2 read value 1,2,3,4,5,.....
every 10 ms,
blow flow graph show what happen
flowchart LR
pipe1[[pipe 1]]
pipe2[[pipe 2]]
input([1,1,1,1,...]) --> pipe1 -. acc = acc + cur .->pipe2 --> output([1,2,3,4,...])
let's see another example
import { Pipe } from './pipe'
const p1 = Pipe.new<number>(10)
const p2 = Pipe.new<number>(10)
const p3 = Pipe.new<string>(10)
const p4 = Pipe.new<string[]>(10)
// reducer1 is accumulator
const reducer1 = (acc: number, cur: number) => acc + cur
// delay delay duration time then resolve promise
const delay = async (duration: number) => {
await new Promise(res => {
setTimeout(res, duration)
})
}
// reducer2 delay 1 second then join each value into string use ','
const reducer2 = async (acc: string, cur: number) => {
await delay(1000)
const result = acc === '' ? `${cur}` : `${acc},${cur}`
return result
}
// mapper3 split each value into string[] by ','
const mapper3 = (_: any, cur: string) => cur.split(',')
p1.connect(reducer1, 0).pipeTo(p2)
p2.connect(reducer2, '').pipeTo(p3)
p3.connect(mapper3, []).pipeTo(p4)
setInterval(() => {
p1.write(1)
}, 10)
setTimeout(() => {
p1.close()
}, 10000)
;(async () => {
for (;;) {
const result = await p4.read()
console.log(result.value) // print [ '1' ],[ '1', '2' ],[ '1', '2', '3' ],[ '1', '2', '3', '4' ],......
}
})()
blow flow graph show what happen
flowchart TD
input([1,1,1,1,...])
pipe1[[pipe 1]]
pipe2[[pipe 2]]
pipe3[[pipe 3]]
pipe4[[pipe 4]]
pipe5[[pipe 5]]
pipe6[[pipe 6]]
input .-> pipe1 -. ac = acc + cur .-> pipe2
pipe2 -. multiple .-> pipe3 & pipe4
pipe3 -. block 1 second .->pipe5
pipe4 -. some .-> pipe6