lancaster

Reactive data API service

Lancaster

Version 0.0.4

Lancaster is a dynamic reactive dataflow system

You use Lancaster to construct topologies by adding, removing, and connecting together nodes. Each node has a fn (function) it applies to incoming messages and outputs the result.

Node fns can run pretty much any node.js javascript to do things to the messages they receive. They have a data object persists between calls that they can read or write.

Clients can access Lancaster either via a Javascript object (direct api access) or via REST and Websockets. The libraty contains an api server you can spin up when you need it.

A separate worker process you control runs the node fn processing and message routing.

// example message 
{at: 1386197370482, value: 100.00}

Running the Lancaster server provides you with an empty topology.

You create nodes in the Lancaster topology, optionally defining a processing function, and which other nodes it receives messages from.

Any node in the toplogy can have a message injected in to it from outside the topology.

Each node has a procssing function that is applied to messages is receives. The default function is a passthru that outputs the same message it receives.

Nodes can have attributes set on them that can be changed at any time, and can be used or changed by the node's function.

A predefined set of functions is available. User defined functions can be provided to the topology as part of it's configuraion.

A node can have any number of source nodes, allowing you to create signal processing and switching chains, and to combine multiple inputs in to a single output stream.

The topology can be managed via a javascript interface, REST or Websockets.

Results can be streamed out in real time via a javascript event emitter or Websockets.

Lancaster has a runner that starts up a server and a worker. This is the easiest way to get moving fast.

sudo apt-get install redis-server
git clone https://github.com/simonswain/lancaster.git
cd lancaster
npm install
cp config.sample.js config.js
node run

Now surf to http://localhost:4002 to see the REST server:

{"lancaster":{"prefix":"df","env":"development","redis":{"host":"127.0.0.1","port":6379},"server":{"host":"127.0.0.1","port":4002}}}

You can use the curl examples at the bottom of this README to try it out.

Lancaster needs Redis.

sudo apt-get install redis-server

To use Lancaster as a module in your own code, install via NPM.

npm install lancaster
var Lancaster = require('lancaster');
 
// you can pass configuration options in here. If none are given, 
// defaults will be used 
 
var api = Lancaster.api();
 
// use the Topology 
 
api.quit();

To run the Lancaster webserver, follow the example in server.js. You will probably still want to install Lancaster as a module, and manage the server using the provided api.

var Lancaster = require('lancaster');
var server = Lancaster.server();
server.start(function(){
  console.log( 'Lancaster Server Running...' );
});

A sample config file is provided in config.sample.js. Pass this file in when you instantiate the api or server. The config object is the same in both cases. The api only uses the redis and prefix options.

The server needs all the options shown in the sample.

The development values shown in the sample are the same as the defaults used if no config is provided.

Normally you'll create your own config.js based on the sample.

var Lancaster = require('lancaster');
var config = { ... my config options ... };
var api = Lancaster.api(config);
// ... 

The server object provides and api, and a REST server is started for http access.

The api and server as passive -- they modify the topology and inject/receive messages.

The Worker is a long-running process that does the actual work of moving messages around and running the nodes.

An example worker runner is in 'worker.js`. Most likely you will want to implement a version of this in your own project so you have proper control over the worker lifecycle.

var Lancaster = require('lancaster');
var worker = Lancaster.worker();
worker.start(function(){
  console.log('Lancaster Worker running...');
});

Currently Lancaster only safely supports having one worker. If you are happy your messages can be processed in parallel without causing issues, there is nothing stopping you running more than one, and this is a good way to scale up. If you are doing tasks like working out averages or counting things, then parallel workers are probably not a good idea.

There are a couple of things that can be added later to improve this:

  • Have Lancaster run nodes in parallel, but ensure only one of any given node runs at a time.

  • Extend the parallel runner to speficy if a given node can have multiple parallel instances, or must only ever have on running.

a node has a structure like this:

{
  id: 'my-node-id',
  sources: [],
  fn: 'my-fn'
  attrs: {}
}

id is arbitrary. You provide an ID when creating the node. Best to stick to a short (say up to 32 characters) alphanumeric string.

sources is an array of other node id's that this node will receive messages from.

fn is the processing function applied to messages the node receives. The output of this function is latched to the node's output (it remains there available for reading until the next output is created) and is injected in to any listening nodes.

attrs is an object for holding state. When the node is created, you can provide seed attrs to override defaults provided by the nodes's fn. A node's attrs can be read and mutated by the it's fn. The attrs are persisted after the function application is completed on each message, so can be used to calculate running totals or store data across multiple messages.

The default set of fns is in lib/fns.js. It is expected you will create your own set appropriate to your use-case and pass these in when creating your topology

var myFns = require('./path-to/my-fins.js');
var api = Lancaster.api({fns: myFns});

A fn has a signature like this:

fns.count = {
  attrs: {
    total: 0
  },
  initfunction(done){
    done();
  },
  processfunction(attrsmessagedone){
    attrs.total ++;
    done(null, attrs, {total:attrs.total});
  }
};

attrs are the default attrs for each node that uses the fn. When the node is added, and attrs provided replace the defaults in the fn.

init() is called once when the node is created.

process() is passed each incoming message. messages are processed sequentially - each message will be fully processed before the next is started. The attrs passed in the process is the current state of the function. Your process function can modify these. When done is called, Lancaster will persist the changed attrs to storage.

When you have finished processing, call done() with an Error (if any, or null), the attrs (which you may or may not have changed) and the message to output, or if there is no message to output (i.e. leave blank).

Clone the repo

git clone https://github.com/simonswain/lancaster.git
cd lancaster
npm install
cp config.sample.js config.js

If you want to read the code, start with lib/index.js - this is the main entry point. It is loaded (via the top level index.js) when you require('lancaster').

Ensure you have grunt-cli installed

npm install grunt-cli -g

Run lint and tests

git clone https://github.com/simonswain/lancaster.git
cd lancaster
npm install
 
grunt

If you want to submit a pull request, please ensure it passes linting and tests first, and follows the existing coding style. Thanks!

var Lancaster = require('lancaster');
var api = Lancaster.server();

All api methods require a callback handler as their last argument. This will be passed an error and (if appropriate) a value.

Close the api client (disconnect from Redis) ready to exit.

Destroy everything in the topology

Add a node to the topology.

Will callback with an error if the provided id already exists.

Get a JSON representation of a given node.

Get all nodes in the topology

Delete a given node

Inject a message to a given node. The messages is placed on the end of he inbound message queue, and will be processed by the worker when all messages in front of it have been processed.

Take a message off the front of the message queue. Internally this is used by the worker to get the next message to be processed. You probably don't need to use this method.

Convenience method to get attrs (attrs('my-node-id')) or set attrs (attrs('my-node-id', {foo: 'bar'})). Setting attrs completely replaces any existing attrs for the node.

Replace the attrs on a given node

Get the attrs object from a given node

Delete the attrs for a given node. Used internally when deleting a node. You probably don't want to use this.

Make a given node listen to another node. id will receive all messages output by source_id.

Disconnect id from listening to source_id

Get all the nodes that id is listening to.

Replace any existing sources id has with the provided source ids.

Disconnect id from all nodes it's listening to.

Get the nodes that are listening to id

Latch a message to the output of a node (does not distribute the message). Used internally by worker. You probably don't want to use this method.

The the last message (if any) output by id. Then the node's fn is applied to a message, the output of that fn will be latched and available using getData.

Creates a listener on the topology that executes a callback on every message the topology emits.

var handle = function(methodidargs){
  // do stuff 
};
var listener = api.listen(handle);

The following methods being executed will trigger the handler. The handler will be passed a method name, node_id and the arguments used to call the method.

  • add
  • del
  • inject
  • setData
  • setAttrs
  • delAttrs
  • addSource
  • delSource
  • delSources

Get server config

{"lancaster":{"prefix":"df","env":"development","redis":{"host":"127.0.0.1","port":6379},"server":{"host":"127.0.0.1","port":4002}}}

Get server status

{"status":"ok"}

Ping test to check server is up

{pong: <unix-epoch-ms>}

POST /reset

Restore the topology to a pristine state (no nodes)

Get all nodes in the Topology

{'id': {node}, ...}

Get a specific node

{node}

POST /nodes

Add a node to topology

{
  id: 'xxx',
  fn:'fn-slug',
  sources: [id, id, id]
  '<attr>': <value>',
  '<attr>': <value>',
  ...
}

The id, fn and sources keys are reserved. Any other keys can be used for attrs. Attrs will be used by the fn to control it's operation.

POST /nodes/:id

Change attrs on a node. Cannot change id, sources or fn

{
  '<attr>': <value>',
  '<attr>': <value>',
  ...
}

POST /nodes/:id/message

Inject a message in to a node

{message}

Delete a specific node

Get message latched on node's output

{message}

Starting the REST server also make Websockets available.

Websocket methods allow the client to write to the topology.

Any changes on the topology are emitted to connected clients (an #api.listener is connected to the socket, with it's handler sending messages).

Commands and emitted data map on to the javascript api. Send a javascript array containing the method name, then the arguments for the method.

e.g.

["setAttrs", "my-node-id", {"my-value": 500}]

A client can alter a node by sending this message. When the message is processed, it will be emitted to all connected clients.

Messages emitted by nodes are sent over websockets by #setData.

["setAttrs", "my-node-id", {"value": 123}]

The following commands are supported over websockets:

  • reset
  • add
  • del
  • inject
  • setData
  • setAttrs
  • delAttrs
  • addSource
  • delSource
  • delSources

The reset command will issue as many dels are required to reset the topology.

npm install ws

Connect to server

var Ws = require('ws');
ws = new Ws('ws://' + config.server.host + ':' + config.server.port);

Receive messages

var handler = function(x){
  x = JSON.parse(x);
  var method = x[0];
  var id = x[1];
  var args = x[2];
};
ws.on('message', handler);

Send a command

var data = [
  'setAttrs',
  'my-node-id',
  {'my-value': 500}
 ];  
var s = JSON.stringify(data);
ws.send(s, function(){
  // done
});

Quit ws client

ws.terminate();

follow the quickstart example at the top of the README

~/lancaster$ node run

Check we are running

curl -i -X GET http://localhost:4002
HTTP/1.1 200 OK
content-type: application/json; charset=utf-8
content-length: 132
cache-control: no-cache
accept-ranges: bytes
Date: Fri, 19 Sep 2014 05:09:15 GMT
Connection: keep-alive
 
{"lancaster":{"prefix":"df","env":"development","redis":{"host":"127.0.0.1","port":6379},"server":{"host":"127.0.0.1","port":4002}}}

Status

curl -i -X GET http://localhost:4002/status
{"status":"ok"}

Ping

curl -i -X GET http://localhost:4002/ping
{"pong": 1386800817607}

Reset

Rest the topology (delete everything)

curl -i -X POST http://localhost:4002/reset

See we have no nodes

curl -i -X GET http://localhost:4002/nodes
{}

Try to get a node that doesn't exist

curl -i -X GET http://localhost:4002/nodes/unknown-node
 
HTTP/1.1 404 Not Found

Create a node

curl -i -H "Content-Type: application/json" -X POST http://localhost:4002/nodes -d '{"id":"my-node", "fn":"multiply","attrs":{"factor":10}}'

Get it

curl -i -X GET http://localhost:4002/nodes/my-node
{
  "my-node": {
    "fn": "multiply",
    "id": "my-node",
    "attrs": {
      "factor": "10"
    },
    "sources": [],
    "data": null
  }
}

Create another node, using the same fn with different attrs, and listen to the first one:

curl -i -H "Content-Type: application/json" -X POST http://localhost:4002/nodes -d '{"id":"other-node", "fn":"multiply","attrs":{"factor":2}, "sources":["my-node"]}'

Get all our nodes. They are returned as an object keyed by id.

These nodes have null data because they have not processed any messages yet.

curl -i -X GET http://localhost:4002/nodes
{
  "my-node": {
    "id": "my-node",
    "attrs": {
      "factor": 10
    },
    "fn": "multiply",
    "sources": [
      "my-node"
    ],
    "data": null
  },
  "other-node": {
    "id": "other-node",
    "attrs": {
      "factor": 2
    },
    "fn": "multiply",
    "sources": [
      "my-node"
    ],
    "data": null
  }
}

Inject a message to my-node. Since the worker is running, it will be processed immediately.

curl -i -H "Content-Type: application/json" -X POST http://localhost:4002/nodes/my-node/message -d '{"value":100}'

See how it gets processed by inspecting the data (latched outputs) of each node. The data field will contain the most recent message output by the node.

curl -i -X GET http://localhost:4002/nodes
{
  "my-node": {
    "id": "my-node",
    "attrs": {
      "factor": 10
    },
    "fn": "multiply",
    "sources": [
      "my-node"
    ],
    "data": {
      "value": 1000
    },
  },
  "other-node": {
    "id": "other-node",
    "attrs": {
      "factor": 2
    },
    "fn": "multiply",
    "sources": [
      "my-node"
    ],
    "data": {
      "value": 2000
    },
  }
}
  • 23/11/2013 0.0.1 Initial release

Copyright (c) 2014 Simon Swain Licensed under the MIT license.