sfc: Streams Flow Control
Node streams are great, I love them, but always felt that something was missing. When you work with streams it's always linear.. I mean, you read something, then throw a bunch of transforms and then write that info somewhere.
pipeline and finish tools from stream module are great, but still you don't have much control on how data flows, specially if you need parallel execution, or flow to different streams on certain conditons, etc.
That's why I've created sfc. You can think of sfc as a stateless workflow. You can flow a message contitionally through one or multiple streams, transform them, apply a bunch of rules, chain many rules together and flow data to them when specific events are fired, join multiple sources on a single stream, or just flow the first one that arrived, and then you can wrap everything in a goal, for ease of processing.
Even better, you can define all your streams in a YAML or JSON file!
Basically this tool lets you define pipeflows instead of pipelines.
- In a pipeline you connect one pipe after another, forming a straight line.
- In pipeflows you orchestrate how pipes are connected to each other.
What's next
Add streams and custom types from npm modules
Stream Flow Editor
There is a vscode extension available for visually editing stream flows: Stream Flow Editor
Installation
npm install --save stream-flow-control
How to use
const {sfc} = require('stream-flow-control');
sfc.parseFile('./path/to/your/streams/definition');
const MyGoal = sfc.get('Goal', 'MyGoal');
What's new
- 1.1.0: Add support for building custom types. Now you can create a template of your stream, and reuse it within your yaml. Much as _require key in the top level, or inside a Goal, you can use _customTypes key to declare them.
- 1.2.0: Add pause/resume features into manager.
Tools
sfc
This singleton is what you'd mostly interact. It has two crucial jobs.
The first is to manage instances of different kinds of streams. Use get method to retrieve an instance, and set method to register an instance. Classes in this project autoregisters to sfc when you instance them, provided you gave it a name.
The second crucial job is to parse YAML or JSON files, or direcories with files and build pipeflows. Anything built through files is retrievable by sfc.
Classes
Flow
The following schemas explain, in part, how different flow classes work
-
This schema shows the "when" condition. You can attach functions that evaluate the message. If the function returns true, the message is passed to every stream attached to the function, else no message is sent. You can attach a stream to a special condition called "none". If no "when" condition is met, then the message will be sent to the "none" stream.
-
Sends a message for each element of the message. Tipically used when the message is an array and you need to process each item individually.
-
Lets only the first message that arrives to pass through. By default, it groups the messages from different sources by the order in which they arrive, to determine which is the first, and which gets discarded, but the condition can be overwritten.
-
Hold flow from multiple source streams until a condition is met. If no method is overwitten, then holds flow until end is signaled on all sources.
-
Join messages from multiple sources into a single messages. If no method is overwitten, then releases a message when there is exactly one message from each source.
-
Check all piped streams, and flow data to the first that matches criteria.
-
Wait untill every source ended to emit end event.
Goal
Goal is a 'black box' kind of class. You can hide complex piping and rules inside one of these.
Goal is a Writable, so it's got one input, but has two outputs: resolve and reject.
// With Goal you can:
//Pipe to another stream
goal.resolve(successStream);
goal.reject(failStream);
reader.pipe(goal);
//Use node style callback
reader.pipe(goal).callback((err, data) => {
//... do something
});
//Use Promises
reader.pipe(goal)
.then(data => {})
.catch(err => {});
//Listen to events
goal.on('resolve', (data) => {});
goal.on('reject', (err) => {});
reader.pipe(goal);
Goal hooks to error event from all it's children and emits a reject (on all it's variants: event, promise, callback, pipe), so make sure to channel all your errors through the callback if you are using a Transform or Writable within your goal.
Rule
It's a class designed to model complex business logic, but it could be used for any purpose. You must override the method rule. Within that method you can emit as many events as you like, or return any value.
cont rule = new Rule({
rule: (payload) => {
if(payload.data.x) {
// Every stream with someEvent will be sent streamed with data
this.emit('someEvent', payload.data);
// Every stream with someEvent will be sent streamed with data
this.emit('otherEvent', 'arbitrary message');
// If we don't return anything,
// nothing will be piped to resolve nor reject streams
} else if (payload.data.y) {
// We can modify payload
payload.data = {new: object};
// If we return true, modified payload will be sent to resolveStream
return true;
} else if(payload.data.z) {
// If false is returned, payload will be sent to rejectStream
return false;
} else if(payload.data.a) {
// If we return anything but 'true' or 'false' it will be sent to rejectStream
return 'An error message';
} else if(payload.data.b) {
//If we throw something, it will be sent to rejectStream
throw new Error('This is an exception');
}
}
});
//When 'someEvent' is fired, data will be piped to someStream
rule.chain('someEvent', someStream);
//When 'otherEvent' is fired, data will be piped to otherStream
rule.chain('otherEvent', otherStream);
//When rule is resolved, data will be piped to resolveStream
rule.resolve(resolveStream);
//When rule is rejected, data will be piped to rejectStream
rule.reject(rejectStream);
YAML/JSON
You can configure your pipeflow through a (not so) simple yaml/json file.
This is a standard pipeflow definition:
# Pipeflow definition
_require: #1
'{Router}': express #2
path: path #3
_customTypes: #4
typeName: #5
constructor: |
code of constructor that must return
a new stream
nameOfStream1: #6
# Stream definition
nameOfStream2: #6
# Stream definition
# .. define as many streams as you need
As you can see, to define multiple streams, you declare them one under the other, and then, in the stream definition, you can reference them by type and name to pipe it with other streams.
This is true for all streams, except for Goal, that it's the only one that has the special method build where you can declare child streams, but we'll se that in detail later.
Now this is a standard stream definition:
nameOfStrem: #6
type: #7
options: #8
someOption: someValue #9
someMethod: #10
_type: method
params:
- array
- with
- param
- names
code:
- console.log('lines of code')
- console.log('multiple lines if array')
constructor: | #11
return new MyStream(options)
methods: #12
methodName:
params:
- method
- param
code: |
code of
param
on: #13
eventName:
params:
- data
code:
- do something
- with data
once: #14
eventName:
params:
- data
code:
- do something
- with data
pon: #15
eventName:
params:
- data
code:
- do something
- with data
ponce: #16
eventName:
params:
- data
code:
- do something
- with data
when: #17
- cond:
- body of function
dst:
- type: dstType
name: dstName
pipe: #18
- type: dstType
name: dstName
none: #19
- type: dstType
name: dstName
resolve: #20
- type: dstType
name: dstName
reject: #21
- type: dstType
name: dstName
chain: #22
eventName:
- type: dstType
name: dstName
build: #23
# Pipeflow definition
Not all keys apply for every stream definition, but here it's placed all posibilities. Now I'll explain each part of the definition:
-
_require: There are certain definitions that later gets compiled to functions. If inside that function you need to call outside modules like crypto, path, etc, you can define them here, and they will be placed inside every compiled function's scope, along with sfc and every class defined here, Process and Global.
-
If inside _require you defined something like
'{MyClass, MySecondClass}': mymodule
, it's the equivalent as declaring `const {MyClass, MySecondClass} = require('mymodule')`` -
If inside _require you defined something like
name: mymodule
, it's the equivalent as declaringconst name = require('mymodule')
-
_customTypes: If you got a kind of stream that you plan to use multiple times within your pipeflow, you can define that type here. Once you define a type, inside your stream definition you can use it as a value for the type key.
-
A type definition requires a constructor key, where the stream get's built. You must return a stream instance.
-
Stream name. It's how this stream will be identified (along with type) by
sfc.get(...)
. It's the same asnew StreamType({name: 'nameOfStrem'})
for classes defined here. -
Type of stream we are building. It's how this stream will be identified (along with type) by
sfc.get(...)
. It's the only mandatory parameter a stream definition needs. If you are building any of the classes defined in this project, then the value should be the class name, for example FlowAll, Goal, Transform, etc. But really, the value can be anything you imagine. Just rememeber to place the type name correctly when you reference them. -
Options to be passed to the stream constructor.
-
If inside options you defined something like
someOption: someValue
, it's the same asnew MyStream({someOption: 'someValue'})
. Of course, this can be anything that yaml supports, as objects, arrays, etc. -
If the option is an object, and the object contains
_type: method
, this option will be compiled to a function. So, this would be the equivalent as declaring `new MyStream({someMethod: function(array, with, param, names) { ... }})`` -
If you are using a class (that must be declared in context through _require, or be part of this project) and you don't want the type to match the class name, or maybe the stream is created by calling a method like
fs.createReadStream()
, you'll need to define a constructor. The constructor function will always receive an options parameter, so there's no need to define params key. In this definition you should just place the function body that returns the stream. -
If you need to overwrite some method, you can declare it in the methods section. A method definition must consist of a method name, params and function code. For classes defined in this project here are the possible methods you could overwrite:
- Writable
- write
- writev
- destroy
- final
- Readable
- read
- destroy
- Duplex
- write
- writev
- read
- destroy
- final
- Transform
- transform
- flush
- FlowHold
- hold
- release
- check
- FlowJoin
- join
- Rule
- rule
- FlowFirst
- identify
- match
- criteria
- Writable
-
Register a listener for an event. Equivalent to declare
MyStream.on('eventName', function(data) {})
-
Register a listener for an event. Equivalent to declare
MyStream.once('eventName', function(data) {})
-
Register a listener for an event. Equivalent to declare
MyStream.prependListener('eventName', function(data) {})
-
Register a listener for an event. Equivalent to declare
MyStream.prependOnceListener('eventName', function(data) {})
-
Specific definition for FlowAll and FlowOne classes. It registers a conditional piping to a stream. It needs 2 parameters:
- cond: Condition to wich a payload is evaluated. This will be compiled to a function, and will always receive a 'payload' as a parameter. You should return true or false if you want to pipe data to dst.
- dst: Stream identifier if cond returns true. It must consist of an object defining a type and name.
-
It can be used with any class that defines a pipe method. It can also be defined as an array of detinations.
-
It can be used with any class that defines a none method. It depends on each class that implements this method on how it's used, but the standard way would be that data is sent through none when no suitable stream could be found previously.
-
It can be used with any class that defines a resolve method for piping to another stream.
-
It can be used with any class that defines a reject method for piping to another stream.
-
Specific to Rule class. You should define an event name, and streams to pipe to when this event is fired.
-
Specific to Goal class. Here you define the 'pipeflow' messages will travel since they enter this goal. In build you should place the exact same definition as a general pipeflow. You can define anything you like, even child goals, but there are several considerations you should keep in mind:
- If inside a build you define a _require, it will be merged with the one defined in parent scope (if defined). So there's no need to declare twice something in child _require's.
- Inside a build definition, you must declare a
__goal__
key. This marks the starting point where child streams will be piped. - Inside a build definition, you can declare the strings
__resolve__
or__reject__
, and messages will be sent to anything that was piped to the corresponding goal's methods.
myGoal: type: Goal build: __goal__: - type: Transform name: myTransform myTransform: type: Transform methods: transform: params: - payload - encoding - callback code: | try { this.push(JSON.stringify(payload)); callback(); } catch(e) { callback(e); } pipe: __resolve__
Remember that a goal hooks to the 'error' event of all it's children, so in this case, if payload is an object that can be strinifyed, then the message will be piped to any stream that hooked to the resovle method of the goal. If payload cannot be stringifyed and an exception is thrown, the exception will be piped to any stream that hooked to the reject method of the goal
A note on compiling functions
There are some definitions in yaml/json that are compiled to functions. In every definition (except when specifically stated) you'll need 2 keys
- params: It's an array of strings that defines how parameters will be named and accessed in the function's code
- code: The body of the function. It can be stated as a string, or an array of strings that later will be joined with a new line.
so, for example the following definitions are equivalent:
methods:
myMethod:
params:
- data
- callback
code: |
console.log(data)
callback()
---
methods:
myMethod:
params:
- data
- callback
code:
- console.log(data)
- callback()
both of this definitinons compile to the following function:
function myMethod(data, callback) {
console.log(data)
callback()
}
A note on referencing
The definitions that are used for piping data to other streams are when, pipe, none, resolve, reject and chain. In everyone of them you must define a reference to one or more streams. This means that you can, for example define de following
pipe:
type: aType
name: aName
That it would be translated as
myStream.pipe(sfc.get('aType', 'aName'))
But you could also pipe to multiple streams like
pipe:
- type: aType
name: aName
- type: bType
name: bName
That it would be translated as
myStream.pipe(sfc.get('aType', 'aName'))
myStream.pipe(sfc.get('bType', 'bName'))
When a message arrives to myStream it will be delivered to both piped streams.
A note on scopes
The only element that creates a diferent scope is a Goal instance. Because elements are only build when they are accessed, elements that where declared within an external or child goal may not yet exist. So as a rule, it's safe to reference streams that exist on the same scope, or in the parent scope, but do not try to reference a stream that's in a child or sibling scope.
API
Classes
-
Goal ⇐
Writable
-
Goal wrapps many streams within, hidding complex business logic. Represents a goal to achieve.
- DataWrapper
-
Helper class that wrapps streaming payloads inside a
Goal
. It holds history of in which streams it was flown through. -
ReadableWrapper ⇐
Readable
-
Wrapper for Readable streams that knows what to do with DataWrapper messages.
-
WritableWrapper ⇐
Writable
-
Wrapper for Writable streams that knows what to do with DataWrapper messages.
-
TransformWrapper ⇐
Transform
-
Wrapper for Transform streams that knows what to do with DataWrapper messages.
-
DuplexWrapper ⇐
Duplex
-
Wrapper for Duplex streams that knows what to do with DataWrapper messages.
-
Manager ⇐
Writable
-
Mannager is your "toolbelt" class. It's responsible to regiter classes, fetch instances, parse yaml files and building stream chains.
-
Rule ⇐
Writable
-
A rule represents a piece of business logic. Streams can be chained by an arbitrary event name, or by resolve/reject functions.
-
FlowWait ⇐
Readable
-
Wait untill every source ended to emit end event.
-
FlowJoin ⇐
Readable
-
Join messages from multiple sources into a single messages. If no method is overwitten, then releases a message when there is exactly one message from each source.
-
FlowAll ⇐
Writable
-
Check all piped streams, and flow data to all which matches criteria.
-
FlowHold ⇐
Readable
-
Hold flow from multiple source streams until a condition is met. If no method is overwitten, then holds flow until end is signaled on all sources.
-
FlowOne ⇐
Writable
-
Check all piped streams, and flow data to the first that matches criteria.
-
FlowEach ⇐
Writable
-
Stream every element of an array individually.
-
FlowFirst ⇐
Readable
-
Flow the first message from multiple source streams and discard the rest.
Typedefs
-
Condition ⇒
boolean
-
Condition function for flowing to a stream
-
Process :
function
-
Payload processing method
-
Then ⇒
Promise
-
Thenable function
-
Catch ⇒
Promise
-
Thenable function
-
Identify ⇒
any
-
Returns the identity of a message. It could be an internal id, or an id of a wrapped message.
-
NodeCallback :
function
-
Node style callback
-
RuleProcess ⇒
boolean
|any
|undefined
-
Process rules. You may emit events to send data to streams.
Writable
Goal ⇐ Goal wrapps many streams within, hidding complex business logic. Represents a goal to achieve.
Kind: global class
Extends: Writable
-
Goal ⇐
Writable
new Goal([options])
Create a Goal stream
Param | Type | Description |
---|---|---|
[options] | object |
Global options |
[options.name] | string |
Name for this stream |
goal.resolve(dst)
Pipe to a destination stream when goal was achieved
Kind: instance method of Goal
Param | Type |
---|---|
dst | Writable |
goal.callback(cb)
Register a node style callback, called when goal is resolved or rejected
Kind: instance method of Goal
Param | Type |
---|---|
cb | NodeCallback |
goal.reject(dst)
Pipe to a destination stream when goal was not achieved
Kind: instance method of Goal
Param | Type |
---|---|
dst | Writable |
Promise
goal.then(thenCallback, [catchCallback]) ⇒ Thenable function to work with promises.
Kind: instance method of Goal
Param | Type | Description |
---|---|---|
thenCallback | ThenCallback |
called when goal is achieved |
[catchCallback] | CatchCallback |
called when goal is not achieved |
Promise
goal.catch(catchCallback) ⇒ Thenable function to work with promises.
Kind: instance method of Goal
Param | Type | Description |
---|---|---|
catchCallback | CatchCallback |
called when goal is not achieved |
DataWrapper
Helper class that wrapps streaming payloads inside a Goal
. It holds history of in which streams it was flown through.
Readable
ReadableWrapper ⇐ Wrapper for Readable streams that knows what to do with DataWrapper messages.
Kind: global class
Extends: Readable
Writable
WritableWrapper ⇐ Wrapper for Writable streams that knows what to do with DataWrapper messages.
Kind: global class
Extends: Writable
Transform
TransformWrapper ⇐ Wrapper for Transform streams that knows what to do with DataWrapper messages.
Kind: global class
Extends: Transform
Duplex
DuplexWrapper ⇐ Wrapper for Duplex streams that knows what to do with DataWrapper messages.
Kind: global class
Extends: Duplex
Writable
Manager ⇐ Mannager is your "toolbelt" class. It's responsible to regiter classes, fetch instances, parse yaml files and building stream chains.
Kind: global class
Extends: Writable
-
Manager ⇐
Writable
-
.clean() ⇒
Manager
-
.pause() ⇒
Manager
-
.resume() ⇒
Manager
- .set(type, [name], elem)
-
.get(type, name) ⇒
Readable
|Writable
|null
- .parse(confStr, cb)
-
.parseFiles([filePaths]) ⇒
Manager
-
.clean() ⇒
Manager
sfc.clean() ⇒ Clean namager state
Useful when you need to start again
Kind: instance method of Manager
Returns: Manager
- 'this' element for chainable purpose.
Manager
sfc.pause() ⇒ Set manager in paused mode.
Use it before parsing files like sfc.pause().parseFiles(...)
Useful when you need all streams to be created before piping
Kind: instance method of Manager
Returns: Manager
- 'this' element for chainable purpose.
Manager
sfc.resume() ⇒ Set inner streams in fowing mode.
Use it after parsing files like sfc.pause().parseFiles(...).resume()
Useful when you need all streams to be created before piping
Kind: instance method of Manager
Returns: Manager
- 'this' element for chainable purpose.
sfc.set(type, [name], elem)
Register a stream
Kind: instance method of Manager
Param | Type | Description |
---|---|---|
type | string |
type of stream |
[name] | string |
name for this stream |
elem |
Writable | Readable
|
the stream to register |
Readable
| Writable
| null
sfc.get(type, name) ⇒ Retrieve a stream by type and name. If stream has ended, creates a new instance
Kind: instance method of Manager
Returns: Readable
| Writable
| null
- returns the indicated stream or null if not found
Param | Type | Description |
---|---|---|
type | string |
type of stream |
name | string |
the name of the stream to retrieve |
sfc.parse(confStr, cb)
Parse a JSON or YAML string and create streams
Kind: instance method of Manager
Param | Type | Description |
---|---|---|
confStr | string |
JSON or YAML string. |
cb | NodeCallback |
Callback returning an error if occurred. |
Manager
sfc.parseFiles([filePaths]) ⇒ Parse a file or directory and build streams. If no parameter is passed, parseFiles will automatically search for sfc directory, and files that ends with .sfc, .sfc.yaml, .sfc.yml or .sfc.json.
Kind: instance method of Manager
Returns: Manager
- returns 'this' for chainable purposes
Param | Type | Description |
---|---|---|
[filePaths] |
string | array
|
path of yaml/json file or a directory containing those files. |
Writable
Rule ⇐ A rule represents a piece of business logic. Streams can be chained by an arbitrary event name, or by resolve/reject functions.
Kind: global class
Extends: Writable
new Rule([options])
Create a Rule stream
Param | Type | Description |
---|---|---|
[options] | object |
Global options |
[options.name] | string |
Name for this stream |
options.rule | RuleProcess |
Method that defines the rule logic. If inside the rule you emit an event with data, it will be sent to streams attached to the corresponding event on chain method. If this function returns exactly true, incoming payload will be sent to streams attached in resolve function. If this function returns exactly false, incoming payload will be sent to streams attached in reject function. If this function returns undefined, no message will be sent to either streams attached to resolve or reject. If anything else is returned, or an exception is thrown, that will be sent to streams attached in reject function. |
rule.chain(event, dst)
Register streams to pipe to when event is fired.
Kind: instance method of Rule
Param | Type | Description |
---|---|---|
event | string |
event name. |
dst | Writable |
stream to pipe to. |
rule.resolve(dst)
Register streams to pipe to when options.rule returns true
Kind: instance method of Rule
Param | Type | Description |
---|---|---|
dst | Writable |
stream to pipe to |
rule.reject(dst)
Register streams to pipe to when options.rule returns anything else but true, or when an exception is thrown.
Kind: instance method of Rule
Param | Type | Description |
---|---|---|
dst | Writable |
stream to pipe to |
Readable
FlowWait ⇐ Wait untill every source ended to emit end event.
Kind: global class
Extends: Readable
new FlowWait(options)
Create a FlowWait stream
Param | Type | Description |
---|---|---|
options | object |
Global options. |
[options.name] | string |
Name for this stream. |
Readable
FlowJoin ⇐ Join messages from multiple sources into a single messages. If no method is overwitten, then releases a message when there is exactly one message from each source.
Kind: global class
Extends: Readable
new FlowJoin(options)
Create a FlowJoin stream
Param | Type | Description |
---|---|---|
options | object |
Global options. |
[options.name] | string |
Name for this stream. |
[options.join] | Process |
How messages are joined. |
Writable
FlowAll ⇐ Check all piped streams, and flow data to all which matches criteria.
Kind: global class
Extends: Writable
-
FlowAll ⇐
Writable
- new FlowAll([options])
-
.when(cond, dst) ⇒
Writable
|FlowAll
-
.pipe(dst) ⇒
Writable
|FlowAll
-
.none(dst) ⇒
Writable
|FlowAll
- ._write(payload, encoding, cb)
new FlowAll([options])
Create a FlowAll stream
Param | Type | Description |
---|---|---|
[options] | object |
Global options |
[options.name] | string |
Name for this stream |
Writable
| FlowAll
flowAll.when(cond, dst) ⇒ Set stream(s) to pipe if criteria is match
Kind: instance method of FlowAll
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type |
---|---|
cond | Condition |
dst |
Writable | Array.<Writable>
|
Writable
| FlowAll
flowAll.pipe(dst) ⇒ Set stream(s) to pipe unconditionally
Kind: instance method of FlowAll
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type |
---|---|
dst |
Writable | Array.<Writable>
|
Writable
| FlowAll
flowAll.none(dst) ⇒ Set stream(s) to pipe when no other stream matched criteria. If unconditional piping was set, this will never be used.
Kind: instance method of FlowAll
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type | Description |
---|---|---|
dst |
Writable | Array.<Writable>
|
Destination stream |
flowAll._write(payload, encoding, cb)
Internal _write method. Do not call directly. Do not override unless you are sure of what you are doing
Kind: instance method of FlowAll
Param | Type | Description |
---|---|---|
payload |
DataWrapper | *
|
chunk of data to be written |
encoding | string |
encoding of data when objectMode=false. Never used. |
cb | WriteCallback |
Internal Writable callback |
Readable
FlowHold ⇐ Hold flow from multiple source streams until a condition is met. If no method is overwitten, then holds flow until end is signaled on all sources.
Kind: global class
Extends: Readable
-
FlowHold ⇐
Readable
new FlowHold(options)
Create a FlowHold stream
Param | Type | Description |
---|---|---|
options | object |
Global options. |
[options.name] | string |
Name for this stream. |
[options.hold] | Process |
How messages are stored. |
[options.check] | function |
Checks a certain condition is met and call options.release. |
[options.release] | function |
Controls how messages are released. |
flowHold._sources
Kind: instance property of FlowHold
Properties
Name | Description |
---|---|
Source | streams |
flowHold._payloads
Kind: instance property of FlowHold
Properties
Name | Description |
---|---|
Stored | messages. There is an array for each source |
Writable
FlowOne ⇐ Check all piped streams, and flow data to the first that matches criteria.
Kind: global class
Extends: Writable
-
FlowOne ⇐
Writable
- new FlowOne([options])
-
.when(cond, dst) ⇒
Writable
|FlowAll
-
.pipe(dst) ⇒
Writable
|FlowAll
-
.none(dst) ⇒
Writable
|FlowAll
- ._write(payload, encoding, cb)
new FlowOne([options])
Create a FlowOne stream
Param | Type | Description |
---|---|---|
[options] | object |
Global options |
[options.name] | string |
Name for this stream |
Writable
| FlowAll
flowOne.when(cond, dst) ⇒ Set stream(s) to pipe if criteria is match
Kind: instance method of FlowOne
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type |
---|---|
cond | Condition |
dst |
Writable | Array.<Writable>
|
Writable
| FlowAll
flowOne.pipe(dst) ⇒ Set stream(s) to pipe unconditionally
Kind: instance method of FlowOne
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type |
---|---|
dst |
Writable | Array.<Writable>
|
Writable
| FlowAll
flowOne.none(dst) ⇒ Set stream(s) to pipe when no other stream matched criteria. If unconditional piping was set, this will never be used.
Kind: instance method of FlowOne
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type | Description |
---|---|---|
dst |
Writable | Array.<Writable>
|
Destination stream |
flowOne._write(payload, encoding, cb)
Internal _write method. Do not call directly. Do not override unless you are sure of what you are doing
Kind: instance method of FlowOne
Param | Type | Description |
---|---|---|
payload |
DataWrapper | *
|
chunk of data to be written |
encoding | string |
encoding of data when objectMode=false. Never used. |
cb | WriteCallback |
Internal Writable callback |
Writable
FlowEach ⇐ Stream every element of an array individually.
Kind: global class
Extends: Writable
-
FlowEach ⇐
Writable
- new FlowEach([options])
-
.pipe(dst) ⇒
Writable
|FlowAll
-
.none(dst) ⇒
Writable
|FlowAll
new FlowEach([options])
Create a FlowEach stream
Param | Type | Description |
---|---|---|
[options] | object |
Global options |
[options.name] | string |
Name for this stream |
Writable
| FlowAll
flowEach.pipe(dst) ⇒ Set stream(s) to pipe to
Kind: instance method of FlowEach
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type |
---|---|
dst |
Writable | Array.<Writable>
|
Writable
| FlowAll
flowEach.none(dst) ⇒ Set stream(s) to pipe to when no other stream was piped
Kind: instance method of FlowEach
Returns: Writable
| FlowAll
- if dst is an array, then returns "this", else dst is returned
Param | Type | Description |
---|---|---|
dst |
Writable | Array.<Writable>
|
Destination stream |
Readable
FlowFirst ⇐ Flow the first message from multiple source streams and discard the rest.
Kind: global class
Extends: Readable
-
FlowFirst ⇐
Readable
new FlowFirst(options)
Create a FlowFirst stream
Param | Type | Description |
---|---|---|
options | object |
Global options |
[options.name] | string |
Name for this stream |
options.identify | Identify |
Function that returns a message id to match |
[options.criteria] | Condition |
Add an extra criteria for message to match. Return false to discard the message. |
[options.match] | Process |
How messages are matched according to options.identify function. |
flowFirst._sources
Kind: instance property of FlowFirst
Properties
Name | Description |
---|---|
Source | streams |
boolean
Condition ⇒ Condition function for flowing to a stream
Kind: global typedef
Returns: boolean
- if true, condition is accepted and data is flown to stream
Param | Type | Description |
---|---|---|
payload |
DataWrapper | any
|
chunk that's flown through the stream |
function
Process : Payload processing method
Kind: global typedef
Param | Type | Description |
---|---|---|
payload |
DataWrapper | any
|
chunk that's going to be processed somehow |
Promise
Then ⇒ Thenable function
Kind: global typedef
Param | Type | Description |
---|---|---|
payload |
DataWrapper | any
|
chunk that's going to be processed somehow |
Promise
Catch ⇒ Thenable function
Kind: global typedef
Param | Type | Description |
---|---|---|
error |
DataWrapper | any
|
error message |
any
Identify ⇒ Returns the identity of a message. It could be an internal id, or an id of a wrapped message.
Kind: global typedef
Returns: any
- identity of the message
Param | Type | Description |
---|---|---|
payload |
DataWrapper | any
|
chunk that's flown through the stream |
function
NodeCallback : Node style callback
Kind: global typedef
Param | Type |
---|---|
error | any |
data | any |
boolean
| any
| undefined
RuleProcess ⇒ Process rules. You may emit events to send data to streams.
Kind: global typedef
Param | Description |
---|---|
(DataWrapper | any)} |
Example
Here's a rather complex YAML configuration example. You can find how all piece together here
_require:
crypto: crypto
fs: fs
authenticate:
type: Goal
build:
__goal__:
type: FlowAll
name: choose_method
choose_method:
options:
none_reason:
code: 404
message: 'No valid authentication method provided'
type: FlowAll
when:
- cond: |
const req = payload.data
return req.headers && req.headers.authorization && /^Basic /.test(req.headers.authorization)
dst:
type: Transform
name: parse_basic
- cond: |
const req = payload.data
if(!(req.query && req.query.user && req.query.pass) && !(req.body && req.body.user && req.body.pass)) return false
req.auth = {user: req.query.user||req.body.user, pass: req.query.pass||req.body.pass}
return true
dst:
type: FlowFirst
name: first_auth
- cond: |
const req = payload.data
return (req.query && req.query.token) || (req.headers && req.headers.authorization && /^Bearer /.test(req.headers.authorization))
dst:
type: Transform
name: parse_token
none: __reject__
once:
prefinish:
code: |
const payload = new DataWrapper(this.goal._src, this);
this._write(payload, null, ()=>{});
parse_basic:
type: Transform
methods:
transform:
code: |
let basic = payload.data.headers.authorization.replace(/^Basic (.*)$/, (str, match)=>match);
let auth_array = Buffer.from(basic, 'base64').toString('utf8').split(':');
payload.data.auth = {user: auth_array[0], pass: auth_array[1]};
this.push(payload);
cb()
params:
- payload
- encoding
- cb
pipe:
type: FlowFirst
name: first_auth
first_auth:
type: FlowFirst
methods:
identify:
code: |
return payload.map.FlowAll.choose_method
params:
- payload
pipe:
type: Transform
name: validate_user
parse_token:
type: Transform
methods:
transform:
code: |
try {
const tokens = JSON.parse(fs.readFileSync('./tokens.json')).tokens
const token = (payload.data.query && payload.data.query.token) || payload.data.headers.authorization.replace(/^Bearer (.*)$/, (str, match)=>match)
if(!tokens.hasOwnProperty(token)) return callback(payload.getChild(this).setData({code: 401, message: 'Invalid token'}))
payload.data.userid = tokens[token]
this.push(payload)
callback()
} catch(e) {
callback(payload.getChild(this).setData({code: 500, message: e.message}))
}
params:
- payload
- encoding
- callback
pipe: __resolve__
validate_user:
type: Transform
methods:
transform:
code: |
try {
const users = JSON.parse(fs.readFileSync('./users.json')).users
const user = users.filter(u => u.user == payload.data.auth.user)
if(!user.length) return callback(payload.getChild(this).setData({code: 401, message: 'Incorrect user or password'}))
const hash = crypto.createHash(user[0].method)
hash.update(payload.data.auth.pass)
if(user[0].password != hash.digest('hex')) return callback(payload.getChild(this).setData({code: 401, message: 'Incorrect user or password'}))
payload.data.userid = user[0].id
delete payload.data.auth
this.push(payload)
callback()
} catch(e) {
callback(payload.getChild(this).setData({code: 500, message: e.message}))
}
params:
- payload
- encoding
- callback
pipe: __resolve__
authorise:
type: Goal
build:
__goal__:
type: Transform
name: get_user_data
get_user_data:
type: Transform
methods:
transform:
code: |
try {
const users = JSON.parse(fs.readFileSync('./users.json')).users
const user = users.filter(u => u.id == payload.data.userid)
if(!user.length) return callback(payload.getChild(this).setData({code: 401, message: 'Incorrect user or password'}))
payload.data.user = user[0]
this.push(payload)
callback()
} catch(e) {
callback(payload.getChild(this).setData({code: 500, message: e.message}))
}
params:
- payload
- encoding
- callback
ponce:
prefinish:
code: |
const payload = new DataWrapper(this.goal._src, this);
this._write(payload, null, (err)=>{ if(err) this.emit('error', err)});
pipe:
type: FlowOne
name: auth_methods
auth_methods:
type: FlowOne
options:
none_reason:
code: 401
message: Unauthorised access
when:
cond: |
return payload.data.userid == payload.data.params.id || payload.data.user.is_admin
dst: __resolve__
none: __reject__
get_user:
type: Transform
methods:
transform:
code: |
try {
const users = JSON.parse(fs.readFileSync('./users.json')).users
const user = users.filter(u => u.id == payload.params.id)
if(!user.length) return callback({code: 404, message: 'User not found'})
this.push(JSON.stringify(user[0]))
callback()
} catch(e) {
callback({code: 500, message: e.message})
}
params:
- payload
- encoding
- callback
on:
pipe:
code:
- this._src = src
params:
- src
ponce:
prefinish:
code:
- this._write(this._src, null, (err)=>{ if(err) this.emit('error', err)});