stream-flow-control

1.4.4 • Public • Published

sfc: Streams Flow Control

pipeline status coverage report

Node streams are great, I love them, but always felt that something was missing. When you work with streams it's always linear.. I mean, you read something, then throw a bunch of transforms and then write that info somewhere.

pipeline and finish tools from stream module are great, but still you don't have much control on how data flows, specially if you need parallel execution, or flow to different streams on certain conditons, etc.

That's why I've created sfc. You can think of sfc as a stateless workflow. You can flow a message contitionally through one or multiple streams, transform them, apply a bunch of rules, chain many rules together and flow data to them when specific events are fired, join multiple sources on a single stream, or just flow the first one that arrived, and then you can wrap everything in a goal, for ease of processing.

Even better, you can define all your streams in a YAML or JSON file!

Basically this tool lets you define pipeflows instead of pipelines.

  • In a pipeline you connect one pipe after another, forming a straight line.
  • In pipeflows you orchestrate how pipes are connected to each other.

What's next

Add streams and custom types from npm modules

Stream Flow Editor

There is a vscode extension available for visually editing stream flows: Stream Flow Editor

Stream Flow Editor

Installation

npm install --save stream-flow-control

How to use

const {sfc} = require('stream-flow-control');

sfc.parseFile('./path/to/your/streams/definition');

const MyGoal = sfc.get('Goal', 'MyGoal');

What's new

  • 1.1.0: Add support for building custom types. Now you can create a template of your stream, and reuse it within your yaml. Much as _require key in the top level, or inside a Goal, you can use _customTypes key to declare them.
  • 1.2.0: Add pause/resume features into manager.

Tools

sfc

This singleton is what you'd mostly interact. It has two crucial jobs.

The first is to manage instances of different kinds of streams. Use get method to retrieve an instance, and set method to register an instance. Classes in this project autoregisters to sfc when you instance them, provided you gave it a name.

The second crucial job is to parse YAML or JSON files, or direcories with files and build pipeflows. Anything built through files is retrievable by sfc.

Classes

Flow

The following schemas explain, in part, how different flow classes work

  • FlowAll

    This schema shows the "when" condition. You can attach functions that evaluate the message. If the function returns true, the message is passed to every stream attached to the function, else no message is sent. You can attach a stream to a special condition called "none". If no "when" condition is met, then the message will be sent to the "none" stream.

    FlowAll

  • FlowEach

    Sends a message for each element of the message. Tipically used when the message is an array and you need to process each item individually.

    FlowEach

  • FlowFirst

    Lets only the first message that arrives to pass through. By default, it groups the messages from different sources by the order in which they arrive, to determine which is the first, and which gets discarded, but the condition can be overwritten.

    FlowFirst

  • FlowHold

    Hold flow from multiple source streams until a condition is met. If no method is overwitten, then holds flow until end is signaled on all sources.

    FlowHold

  • FlowJoin

    Join messages from multiple sources into a single messages. If no method is overwitten, then releases a message when there is exactly one message from each source.

    FlowJoin

  • FlowOne

    Check all piped streams, and flow data to the first that matches criteria.

    FlowOne

  • FlowWait

    Wait untill every source ended to emit end event.

    FlowWait

Goal

Goal is a 'black box' kind of class. You can hide complex piping and rules inside one of these.

Goal

Goal is a Writable, so it's got one input, but has two outputs: resolve and reject.

// With Goal you can:

//Pipe to another stream
goal.resolve(successStream);
goal.reject(failStream);
reader.pipe(goal);

//Use node style callback
reader.pipe(goal).callback((err, data) => {
    //... do something
});

//Use Promises
reader.pipe(goal)
    .then(data => {})
    .catch(err => {});

//Listen to events
goal.on('resolve', (data) => {});
goal.on('reject', (err) => {});
reader.pipe(goal);

Goal hooks to error event from all it's children and emits a reject (on all it's variants: event, promise, callback, pipe), so make sure to channel all your errors through the callback if you are using a Transform or Writable within your goal.

Rule

It's a class designed to model complex business logic, but it could be used for any purpose. You must override the method rule. Within that method you can emit as many events as you like, or return any value.

Rule

cont rule = new Rule({
    rule: (payload) => {
        if(payload.data.x) {
            // Every stream with someEvent will be sent streamed with data
            this.emit('someEvent', payload.data);
            
            // Every stream with someEvent will be sent streamed with data
            this.emit('otherEvent', 'arbitrary message');
            
            // If we don't return anything, 
            // nothing will be piped to resolve nor reject streams
        } else if (payload.data.y) {
            // We can modify payload
            payload.data = {new: object};

            // If we return true, modified payload will be sent to resolveStream
            return true;
        } else if(payload.data.z) {

            // If false is returned, payload will be sent to rejectStream
            return false;
        } else if(payload.data.a) {

            // If we return anything but 'true' or 'false' it will be sent to rejectStream
            return 'An error message';
        } else if(payload.data.b) {

            //If we throw something, it will be sent to rejectStream
            throw new Error('This is an exception');
        }
    }
});

//When 'someEvent' is fired, data will be piped to someStream
rule.chain('someEvent', someStream);

//When 'otherEvent' is fired, data will be piped to otherStream
rule.chain('otherEvent', otherStream);

//When rule is resolved, data will be piped to resolveStream
rule.resolve(resolveStream);

//When rule is rejected, data will be piped to rejectStream
rule.reject(rejectStream);

YAML/JSON

You can configure your pipeflow through a (not so) simple yaml/json file.

This is a standard pipeflow definition:

# Pipeflow definition
_require: #1
    '{Router}': express #2
    path: path #3
_customTypes: #4
    typeName: #5
        constructor: | 
            code of constructor that must return 
            a new stream
nameOfStream1: #6
    # Stream definition
nameOfStream2: #6
    # Stream definition
# .. define as many streams as you need

As you can see, to define multiple streams, you declare them one under the other, and then, in the stream definition, you can reference them by type and name to pipe it with other streams.

This is true for all streams, except for Goal, that it's the only one that has the special method build where you can declare child streams, but we'll se that in detail later.

Now this is a standard stream definition:

nameOfStrem: #6
    type: #7
    options: #8
        someOption: someValue #9
        someMethod: #10
            _type: method
            params:
                - array
                - with
                - param
                - names
            code:
                - console.log('lines of code')
                - console.log('multiple lines if array')
    constructor: | #11
        return new MyStream(options)
    methods: #12
        methodName:
            params:
                - method
                - param
            code: |
                code of
                param
    on: #13
        eventName: 
            params:
                - data
            code:
                - do something
                - with data
    once: #14
        eventName:
            params:
                - data
            code:
                - do something
                - with data
    pon: #15
        eventName:
            params:
                - data
            code:
                - do something
                - with data
    ponce: #16
        eventName:
            params:
                - data
            code:
                - do something
                - with data
    when: #17
        - cond:
            - body of function
          dst:
            - type: dstType
              name: dstName
    pipe: #18
        - type: dstType
          name: dstName
    none: #19
        - type: dstType
          name: dstName
    resolve: #20
        - type: dstType
          name: dstName
    reject: #21
        - type: dstType
          name: dstName
    chain: #22
        eventName:
            - type: dstType
              name: dstName
    build: #23
        # Pipeflow definition

Not all keys apply for every stream definition, but here it's placed all posibilities. Now I'll explain each part of the definition:

  1. _require: There are certain definitions that later gets compiled to functions. If inside that function you need to call outside modules like crypto, path, etc, you can define them here, and they will be placed inside every compiled function's scope, along with sfc and every class defined here, Process and Global.

  2. If inside _require you defined something like '{MyClass, MySecondClass}': mymodule, it's the equivalent as declaring `const {MyClass, MySecondClass} = require('mymodule')``

  3. If inside _require you defined something like name: mymodule, it's the equivalent as declaring const name = require('mymodule')

  4. _customTypes: If you got a kind of stream that you plan to use multiple times within your pipeflow, you can define that type here. Once you define a type, inside your stream definition you can use it as a value for the type key.

  5. A type definition requires a constructor key, where the stream get's built. You must return a stream instance.

  6. Stream name. It's how this stream will be identified (along with type) by sfc.get(...). It's the same as new StreamType({name: 'nameOfStrem'}) for classes defined here.

  7. Type of stream we are building. It's how this stream will be identified (along with type) by sfc.get(...). It's the only mandatory parameter a stream definition needs. If you are building any of the classes defined in this project, then the value should be the class name, for example FlowAll, Goal, Transform, etc. But really, the value can be anything you imagine. Just rememeber to place the type name correctly when you reference them.

  8. Options to be passed to the stream constructor.

  9. If inside options you defined something like someOption: someValue, it's the same as new MyStream({someOption: 'someValue'}). Of course, this can be anything that yaml supports, as objects, arrays, etc.

  10. If the option is an object, and the object contains _type: method, this option will be compiled to a function. So, this would be the equivalent as declaring `new MyStream({someMethod: function(array, with, param, names) { ... }})``

  11. If you are using a class (that must be declared in context through _require, or be part of this project) and you don't want the type to match the class name, or maybe the stream is created by calling a method like fs.createReadStream(), you'll need to define a constructor. The constructor function will always receive an options parameter, so there's no need to define params key. In this definition you should just place the function body that returns the stream.

  12. If you need to overwrite some method, you can declare it in the methods section. A method definition must consist of a method name, params and function code. For classes defined in this project here are the possible methods you could overwrite:

    • Writable
      • write
      • writev
      • destroy
      • final
    • Readable
      • read
      • destroy
    • Duplex
      • write
      • writev
      • read
      • destroy
      • final
    • Transform
      • transform
      • flush
    • FlowHold
      • hold
      • release
      • check
    • FlowJoin
      • join
    • Rule
      • rule
    • FlowFirst
      • identify
      • match
      • criteria
  13. Register a listener for an event. Equivalent to declare MyStream.on('eventName', function(data) {})

  14. Register a listener for an event. Equivalent to declare MyStream.once('eventName', function(data) {})

  15. Register a listener for an event. Equivalent to declare MyStream.prependListener('eventName', function(data) {})

  16. Register a listener for an event. Equivalent to declare MyStream.prependOnceListener('eventName', function(data) {})

  17. Specific definition for FlowAll and FlowOne classes. It registers a conditional piping to a stream. It needs 2 parameters:

    • cond: Condition to wich a payload is evaluated. This will be compiled to a function, and will always receive a 'payload' as a parameter. You should return true or false if you want to pipe data to dst.
    • dst: Stream identifier if cond returns true. It must consist of an object defining a type and name.
  18. It can be used with any class that defines a pipe method. It can also be defined as an array of detinations.

  19. It can be used with any class that defines a none method. It depends on each class that implements this method on how it's used, but the standard way would be that data is sent through none when no suitable stream could be found previously.

  20. It can be used with any class that defines a resolve method for piping to another stream.

  21. It can be used with any class that defines a reject method for piping to another stream.

  22. Specific to Rule class. You should define an event name, and streams to pipe to when this event is fired.

  23. Specific to Goal class. Here you define the 'pipeflow' messages will travel since they enter this goal. In build you should place the exact same definition as a general pipeflow. You can define anything you like, even child goals, but there are several considerations you should keep in mind:

    • If inside a build you define a _require, it will be merged with the one defined in parent scope (if defined). So there's no need to declare twice something in child _require's.
    • Inside a build definition, you must declare a __goal__ key. This marks the starting point where child streams will be piped.
    • Inside a build definition, you can declare the strings __resolve__ or __reject__, and messages will be sent to anything that was piped to the corresponding goal's methods.
    myGoal:
      type: Goal
      build:
        __goal__:
          - type: Transform
            name: myTransform
        myTransform:
          type: Transform
          methods:
            transform:
              params:
                - payload
                - encoding
                - callback
              code: |
                try {
                    this.push(JSON.stringify(payload));
                    callback();
                } catch(e) {
                    callback(e);
                }
          pipe: __resolve__

    Remember that a goal hooks to the 'error' event of all it's children, so in this case, if payload is an object that can be strinifyed, then the message will be piped to any stream that hooked to the resovle method of the goal. If payload cannot be stringifyed and an exception is thrown, the exception will be piped to any stream that hooked to the reject method of the goal

A note on compiling functions

There are some definitions in yaml/json that are compiled to functions. In every definition (except when specifically stated) you'll need 2 keys

  • params: It's an array of strings that defines how parameters will be named and accessed in the function's code
  • code: The body of the function. It can be stated as a string, or an array of strings that later will be joined with a new line.

so, for example the following definitions are equivalent:

methods:
  myMethod:
    params:
      - data
      - callback
    code: |
      console.log(data)
      callback()
---
methods:
  myMethod:
    params:
      - data
      - callback
    code:
      - console.log(data)
      - callback()

both of this definitinons compile to the following function:

function myMethod(data, callback) {
    console.log(data)
    callback()
}

A note on referencing

The definitions that are used for piping data to other streams are when, pipe, none, resolve, reject and chain. In everyone of them you must define a reference to one or more streams. This means that you can, for example define de following

pipe:
    type: aType
    name: aName

That it would be translated as

myStream.pipe(sfc.get('aType', 'aName'))

But you could also pipe to multiple streams like

pipe:
  - type: aType
    name: aName
  - type: bType
    name: bName

That it would be translated as

myStream.pipe(sfc.get('aType', 'aName'))
myStream.pipe(sfc.get('bType', 'bName'))

When a message arrives to myStream it will be delivered to both piped streams.

A note on scopes

The only element that creates a diferent scope is a Goal instance. Because elements are only build when they are accessed, elements that where declared within an external or child goal may not yet exist. So as a rule, it's safe to reference streams that exist on the same scope, or in the parent scope, but do not try to reference a stream that's in a child or sibling scope.

API

Classes

GoalWritable

Goal wrapps many streams within, hidding complex business logic. Represents a goal to achieve.

DataWrapper

Helper class that wrapps streaming payloads inside a Goal. It holds history of in which streams it was flown through.

ReadableWrapperReadable

Wrapper for Readable streams that knows what to do with DataWrapper messages.

WritableWrapperWritable

Wrapper for Writable streams that knows what to do with DataWrapper messages.

TransformWrapperTransform

Wrapper for Transform streams that knows what to do with DataWrapper messages.

DuplexWrapperDuplex

Wrapper for Duplex streams that knows what to do with DataWrapper messages.

ManagerWritable

Mannager is your "toolbelt" class. It's responsible to regiter classes, fetch instances, parse yaml files and building stream chains.

RuleWritable

A rule represents a piece of business logic. Streams can be chained by an arbitrary event name, or by resolve/reject functions.

FlowWaitReadable

Wait untill every source ended to emit end event.

FlowJoinReadable

Join messages from multiple sources into a single messages. If no method is overwitten, then releases a message when there is exactly one message from each source.

FlowAllWritable

Check all piped streams, and flow data to all which matches criteria.

FlowHoldReadable

Hold flow from multiple source streams until a condition is met. If no method is overwitten, then holds flow until end is signaled on all sources.

FlowOneWritable

Check all piped streams, and flow data to the first that matches criteria.

FlowEachWritable

Stream every element of an array individually.

FlowFirstReadable

Flow the first message from multiple source streams and discard the rest.

Typedefs

Conditionboolean

Condition function for flowing to a stream

Process : function

Payload processing method

ThenPromise

Thenable function

CatchPromise

Thenable function

Identifyany

Returns the identity of a message. It could be an internal id, or an id of a wrapped message.

NodeCallback : function

Node style callback

RuleProcessboolean | any | undefined

Process rules. You may emit events to send data to streams.

Goal ⇐ Writable

Goal wrapps many streams within, hidding complex business logic. Represents a goal to achieve.

Kind: global class
Extends: Writable

new Goal([options])

Create a Goal stream

Param Type Description
[options] object Global options
[options.name] string Name for this stream

goal.resolve(dst)

Pipe to a destination stream when goal was achieved

Kind: instance method of Goal

Param Type
dst Writable

goal.callback(cb)

Register a node style callback, called when goal is resolved or rejected

Kind: instance method of Goal

Param Type
cb NodeCallback

goal.reject(dst)

Pipe to a destination stream when goal was not achieved

Kind: instance method of Goal

Param Type
dst Writable

goal.then(thenCallback, [catchCallback]) ⇒ Promise

Thenable function to work with promises.

Kind: instance method of Goal

Param Type Description
thenCallback ThenCallback called when goal is achieved
[catchCallback] CatchCallback called when goal is not achieved

goal.catch(catchCallback) ⇒ Promise

Thenable function to work with promises.

Kind: instance method of Goal

Param Type Description
catchCallback CatchCallback called when goal is not achieved

DataWrapper

Helper class that wrapps streaming payloads inside a Goal. It holds history of in which streams it was flown through.

Kind: global class

ReadableWrapper ⇐ Readable

Wrapper for Readable streams that knows what to do with DataWrapper messages.

Kind: global class
Extends: Readable

WritableWrapper ⇐ Writable

Wrapper for Writable streams that knows what to do with DataWrapper messages.

Kind: global class
Extends: Writable

TransformWrapper ⇐ Transform

Wrapper for Transform streams that knows what to do with DataWrapper messages.

Kind: global class
Extends: Transform

DuplexWrapper ⇐ Duplex

Wrapper for Duplex streams that knows what to do with DataWrapper messages.

Kind: global class
Extends: Duplex

Manager ⇐ Writable

Mannager is your "toolbelt" class. It's responsible to regiter classes, fetch instances, parse yaml files and building stream chains.

Kind: global class
Extends: Writable

sfc.clean() ⇒ Manager

Clean namager state

Useful when you need to start again

Kind: instance method of Manager
Returns: Manager - 'this' element for chainable purpose.

sfc.pause() ⇒ Manager

Set manager in paused mode.

Use it before parsing files like sfc.pause().parseFiles(...)

Useful when you need all streams to be created before piping

Kind: instance method of Manager
Returns: Manager - 'this' element for chainable purpose.

sfc.resume() ⇒ Manager

Set inner streams in fowing mode.

Use it after parsing files like sfc.pause().parseFiles(...).resume()

Useful when you need all streams to be created before piping

Kind: instance method of Manager
Returns: Manager - 'this' element for chainable purpose.

sfc.set(type, [name], elem)

Register a stream

Kind: instance method of Manager

Param Type Description
type string type of stream
[name] string name for this stream
elem Writable | Readable the stream to register

sfc.get(type, name) ⇒ Readable | Writable | null

Retrieve a stream by type and name. If stream has ended, creates a new instance

Kind: instance method of Manager
Returns: Readable | Writable | null - returns the indicated stream or null if not found

Param Type Description
type string type of stream
name string the name of the stream to retrieve

sfc.parse(confStr, cb)

Parse a JSON or YAML string and create streams

Kind: instance method of Manager

Param Type Description
confStr string JSON or YAML string.
cb NodeCallback Callback returning an error if occurred.

sfc.parseFiles([filePaths]) ⇒ Manager

Parse a file or directory and build streams. If no parameter is passed, parseFiles will automatically search for sfc directory, and files that ends with .sfc, .sfc.yaml, .sfc.yml or .sfc.json.

Kind: instance method of Manager
Returns: Manager - returns 'this' for chainable purposes

Param Type Description
[filePaths] string | array path of yaml/json file or a directory containing those files.

Rule ⇐ Writable

A rule represents a piece of business logic. Streams can be chained by an arbitrary event name, or by resolve/reject functions.

Kind: global class
Extends: Writable

new Rule([options])

Create a Rule stream

Param Type Description
[options] object Global options
[options.name] string Name for this stream
options.rule RuleProcess Method that defines the rule logic. If inside the rule you emit an event with data, it will be sent to streams attached to the corresponding event on chain method. If this function returns exactly true, incoming payload will be sent to streams attached in resolve function. If this function returns exactly false, incoming payload will be sent to streams attached in reject function. If this function returns undefined, no message will be sent to either streams attached to resolve or reject. If anything else is returned, or an exception is thrown, that will be sent to streams attached in reject function.

rule.chain(event, dst)

Register streams to pipe to when event is fired.

Kind: instance method of Rule

Param Type Description
event string event name.
dst Writable stream to pipe to.

rule.resolve(dst)

Register streams to pipe to when options.rule returns true

Kind: instance method of Rule

Param Type Description
dst Writable stream to pipe to

rule.reject(dst)

Register streams to pipe to when options.rule returns anything else but true, or when an exception is thrown.

Kind: instance method of Rule

Param Type Description
dst Writable stream to pipe to

FlowWait ⇐ Readable

Wait untill every source ended to emit end event.

Kind: global class
Extends: Readable

new FlowWait(options)

Create a FlowWait stream

Param Type Description
options object Global options.
[options.name] string Name for this stream.

FlowJoin ⇐ Readable

Join messages from multiple sources into a single messages. If no method is overwitten, then releases a message when there is exactly one message from each source.

Kind: global class
Extends: Readable

new FlowJoin(options)

Create a FlowJoin stream

Param Type Description
options object Global options.
[options.name] string Name for this stream.
[options.join] Process How messages are joined.

FlowAll ⇐ Writable

Check all piped streams, and flow data to all which matches criteria.

Kind: global class
Extends: Writable

new FlowAll([options])

Create a FlowAll stream

Param Type Description
[options] object Global options
[options.name] string Name for this stream

flowAll.when(cond, dst) ⇒ Writable | FlowAll

Set stream(s) to pipe if criteria is match

Kind: instance method of FlowAll
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type
cond Condition
dst Writable | Array.<Writable>

flowAll.pipe(dst) ⇒ Writable | FlowAll

Set stream(s) to pipe unconditionally

Kind: instance method of FlowAll
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type
dst Writable | Array.<Writable>

flowAll.none(dst) ⇒ Writable | FlowAll

Set stream(s) to pipe when no other stream matched criteria. If unconditional piping was set, this will never be used.

Kind: instance method of FlowAll
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type Description
dst Writable | Array.<Writable> Destination stream

flowAll._write(payload, encoding, cb)

Internal _write method. Do not call directly. Do not override unless you are sure of what you are doing

Kind: instance method of FlowAll

Param Type Description
payload DataWrapper | * chunk of data to be written
encoding string encoding of data when objectMode=false. Never used.
cb WriteCallback Internal Writable callback

FlowHold ⇐ Readable

Hold flow from multiple source streams until a condition is met. If no method is overwitten, then holds flow until end is signaled on all sources.

Kind: global class
Extends: Readable

new FlowHold(options)

Create a FlowHold stream

Param Type Description
options object Global options.
[options.name] string Name for this stream.
[options.hold] Process How messages are stored.
[options.check] function Checks a certain condition is met and call options.release.
[options.release] function Controls how messages are released.

flowHold._sources

Kind: instance property of FlowHold
Properties

Name Description
Source streams

flowHold._payloads

Kind: instance property of FlowHold
Properties

Name Description
Stored messages. There is an array for each source

FlowOne ⇐ Writable

Check all piped streams, and flow data to the first that matches criteria.

Kind: global class
Extends: Writable

new FlowOne([options])

Create a FlowOne stream

Param Type Description
[options] object Global options
[options.name] string Name for this stream

flowOne.when(cond, dst) ⇒ Writable | FlowAll

Set stream(s) to pipe if criteria is match

Kind: instance method of FlowOne
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type
cond Condition
dst Writable | Array.<Writable>

flowOne.pipe(dst) ⇒ Writable | FlowAll

Set stream(s) to pipe unconditionally

Kind: instance method of FlowOne
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type
dst Writable | Array.<Writable>

flowOne.none(dst) ⇒ Writable | FlowAll

Set stream(s) to pipe when no other stream matched criteria. If unconditional piping was set, this will never be used.

Kind: instance method of FlowOne
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type Description
dst Writable | Array.<Writable> Destination stream

flowOne._write(payload, encoding, cb)

Internal _write method. Do not call directly. Do not override unless you are sure of what you are doing

Kind: instance method of FlowOne

Param Type Description
payload DataWrapper | * chunk of data to be written
encoding string encoding of data when objectMode=false. Never used.
cb WriteCallback Internal Writable callback

FlowEach ⇐ Writable

Stream every element of an array individually.

Kind: global class
Extends: Writable

new FlowEach([options])

Create a FlowEach stream

Param Type Description
[options] object Global options
[options.name] string Name for this stream

flowEach.pipe(dst) ⇒ Writable | FlowAll

Set stream(s) to pipe to

Kind: instance method of FlowEach
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type
dst Writable | Array.<Writable>

flowEach.none(dst) ⇒ Writable | FlowAll

Set stream(s) to pipe to when no other stream was piped

Kind: instance method of FlowEach
Returns: Writable | FlowAll - if dst is an array, then returns "this", else dst is returned

Param Type Description
dst Writable | Array.<Writable> Destination stream

FlowFirst ⇐ Readable

Flow the first message from multiple source streams and discard the rest.

Kind: global class
Extends: Readable

new FlowFirst(options)

Create a FlowFirst stream

Param Type Description
options object Global options
[options.name] string Name for this stream
options.identify Identify Function that returns a message id to match
[options.criteria] Condition Add an extra criteria for message to match. Return false to discard the message.
[options.match] Process How messages are matched according to options.identify function.

flowFirst._sources

Kind: instance property of FlowFirst
Properties

Name Description
Source streams

Condition ⇒ boolean

Condition function for flowing to a stream

Kind: global typedef
Returns: boolean - if true, condition is accepted and data is flown to stream

Param Type Description
payload DataWrapper | any chunk that's flown through the stream

Process : function

Payload processing method

Kind: global typedef

Param Type Description
payload DataWrapper | any chunk that's going to be processed somehow

Then ⇒ Promise

Thenable function

Kind: global typedef

Param Type Description
payload DataWrapper | any chunk that's going to be processed somehow

Catch ⇒ Promise

Thenable function

Kind: global typedef

Param Type Description
error DataWrapper | any error message

Identify ⇒ any

Returns the identity of a message. It could be an internal id, or an id of a wrapped message.

Kind: global typedef
Returns: any - identity of the message

Param Type Description
payload DataWrapper | any chunk that's flown through the stream

NodeCallback : function

Node style callback

Kind: global typedef

Param Type
error any
data any

RuleProcess ⇒ boolean | any | undefined

Process rules. You may emit events to send data to streams.

Kind: global typedef

Param Description
(DataWrapper any)}

Example

Here's a rather complex YAML configuration example. You can find how all piece together here

_require: 
    crypto: crypto
    fs: fs
authenticate:
    type: Goal
    build:
        __goal__: 
            type: FlowAll
            name: choose_method
        choose_method:
            options:
                none_reason: 
                    code: 404
                    message: 'No valid authentication method provided'
            type: FlowAll
            when:
            -   cond: |
                    const req = payload.data
                    return req.headers && req.headers.authorization && /^Basic /.test(req.headers.authorization)
                dst:
                    type: Transform
                    name: parse_basic
            -   cond: |
                    const req = payload.data
                    if(!(req.query && req.query.user && req.query.pass) && !(req.body && req.body.user && req.body.pass)) return false
                    req.auth = {user: req.query.user||req.body.user, pass: req.query.pass||req.body.pass}
                    return true
                dst:
                    type: FlowFirst
                    name: first_auth
            -   cond: |
                    const req = payload.data
                    return (req.query && req.query.token) || (req.headers && req.headers.authorization && /^Bearer /.test(req.headers.authorization))
                dst:
                    type: Transform
                    name: parse_token
            none: __reject__
            once:
                prefinish:
                    code: |
                        const payload = new DataWrapper(this.goal._src, this);
                        this._write(payload, null, ()=>{});
        parse_basic:
            type: Transform
            methods:
                transform:
                    code: |
                        let basic = payload.data.headers.authorization.replace(/^Basic (.*)$/, (str, match)=>match);
                        let auth_array = Buffer.from(basic, 'base64').toString('utf8').split(':');
                        payload.data.auth = {user: auth_array[0], pass: auth_array[1]};
                        this.push(payload);
                        cb()
                    params:
                        - payload
                        - encoding
                        - cb
            pipe:
                type: FlowFirst
                name: first_auth
        first_auth:
            type: FlowFirst
            methods:
                identify:
                    code: |
                        return payload.map.FlowAll.choose_method
                    params:
                        - payload
            pipe:
                type: Transform
                name: validate_user
        parse_token:
            type: Transform
            methods:
                transform:
                    code: |
                        try {
                          const tokens = JSON.parse(fs.readFileSync('./tokens.json')).tokens
                          const token = (payload.data.query && payload.data.query.token) || payload.data.headers.authorization.replace(/^Bearer (.*)$/, (str, match)=>match)
                          if(!tokens.hasOwnProperty(token)) return callback(payload.getChild(this).setData({code: 401, message: 'Invalid token'}))
                          payload.data.userid = tokens[token]
                          this.push(payload)
                          callback()
                        } catch(e) {
                          callback(payload.getChild(this).setData({code: 500, message: e.message}))
                        }
                    params:
                        - payload
                        - encoding
                        - callback
            pipe: __resolve__
        validate_user:
            type: Transform
            methods:
                transform:
                    code: |
                        try {
                          const users = JSON.parse(fs.readFileSync('./users.json')).users
                          const user = users.filter(u => u.user == payload.data.auth.user)
                          if(!user.length) return callback(payload.getChild(this).setData({code: 401, message: 'Incorrect user or password'}))
                          const hash = crypto.createHash(user[0].method)
                          hash.update(payload.data.auth.pass)
                          if(user[0].password != hash.digest('hex')) return callback(payload.getChild(this).setData({code: 401, message: 'Incorrect user or password'}))
                          payload.data.userid = user[0].id
                          delete payload.data.auth
                          this.push(payload)
                          callback()
                        } catch(e) {
                          callback(payload.getChild(this).setData({code: 500, message: e.message}))
                        }
                    params:
                        - payload
                        - encoding
                        - callback
            pipe: __resolve__
authorise:
    type: Goal
    build:
        __goal__: 
            type: Transform
            name: get_user_data
        get_user_data:
            type: Transform
            methods:
                transform:
                    code: |
                        try {
                          const users = JSON.parse(fs.readFileSync('./users.json')).users
                          const user = users.filter(u => u.id == payload.data.userid)
                          if(!user.length) return callback(payload.getChild(this).setData({code: 401, message: 'Incorrect user or password'}))
                          payload.data.user = user[0]
                          this.push(payload)
                          callback()
                        } catch(e) {
                          callback(payload.getChild(this).setData({code: 500, message: e.message}))
                        }
                    params:
                        - payload
                        - encoding
                        - callback
            ponce:
                prefinish:
                    code: |
                        const payload = new DataWrapper(this.goal._src, this);
                        this._write(payload, null, (err)=>{ if(err) this.emit('error', err)});
            pipe:
                type: FlowOne
                name: auth_methods
        auth_methods:
            type: FlowOne
            options:
                none_reason:
                    code: 401
                    message: Unauthorised access
            when:
                cond: |
                    return payload.data.userid == payload.data.params.id || payload.data.user.is_admin
                dst: __resolve__
            none: __reject__
get_user:
    type: Transform
    methods:
        transform:
            code: |
                try {
                  const users = JSON.parse(fs.readFileSync('./users.json')).users
                  const user = users.filter(u => u.id == payload.params.id)
                  if(!user.length) return callback({code: 404, message: 'User not found'})
                  this.push(JSON.stringify(user[0]))
                  callback()
                } catch(e) {
                  callback({code: 500, message: e.message})
                }
            params:
                - payload
                - encoding
                - callback
    on:
        pipe:
            code:
                - this._src = src
            params:
                - src
    ponce:
        prefinish:
            code:
                - this._write(this._src, null, (err)=>{ if(err) this.emit('error', err)});

Package Sidebar

Install

npm i stream-flow-control

Weekly Downloads

1

Version

1.4.4

License

GPL-3.0

Unpacked Size

928 kB

Total Files

80

Last publish

Collaborators

  • agustin.moyano