bus.io

An express inspired, event-driven framework for building real time distributed applications over socket.io and redis.

An express inspired, event-driven framework for building real-time distributed apps with socket.io and redis.

var express = require('express');
var app = express();
app.use(express.static(__dirname+'/public'));
var server = require('http').Server(app).listen(3000);
var bus = require('bus.io')(server);
<script type="text/javascript" src="/bus.io/bus.io.js"></script>
<script type="text/javascript">
  var client = io.connect();
  client.on('connect', function () {
    client.emit('echo', 'Hello, World!');
  });
  client.on('echo', function (msg) {
    console.log(msg.content());
  });
</script> 

Features

  • An event-driven architecture provides scalability.
  • Socket events are encapsulated as Message objects.
  • Message objects are evenly distributed over all running bus.io app processes.
  • Standard interface for creating, handling, propagating, and consuming messages.
  • Sockets are associated to actors because messages are delivered to actors.
  • Express like routing and error handling.

How this works

Each socket is associated with one ore more actors. When a socket receives data, the data is encapsulated as a messsage and written to a queue. Since all of your app instances are connected to that queue, one of them will receive the message for processing. After the instance processes the message it can be delivered to the target. A target is just another actor, so if your actor is associated with multiple sockets. Each socket, regardless of which app instance it is connected to, will receive the data from the message.

Installation and Environment Setup

Install node.js (See download and install instructions here: http://nodejs.org/).

Install redis (See download and install instructions http://redis.io/topics/quickstart).

> npm install bus.io

Clone this repository.

> git clone git@github.com:turbonetix/bus.io.git

cd into the directory and install the dependencies

> cd bus.io
> npm install && npm shrinkwrap --dev

API

The Server connects the Messages instance to the `Exchange instance. It also provides ways to bind middleware functions to manipulate messages incomming from the client to the bus, messages processing on the bus, and finally messages outgoing from the bus to the client.

var Bus = require('bus.io');

The current version of the software.

console.log(bus.version);

The Server exposes it self.

var bus = require('bus.io').Server(3000);

The Exchange class queues and propagates messages to your Server. See bus.io-exchange for more information.

var Exchange = require('bus.io').Exchange;

The Messages class is the interface between the Server and the client. See bus.io-messages for more information.

var Messages = require('bus.io').Messages;
var bus = require('bus.io')();
var bus = require('bus.io')(3000);
var io = require('socket.io')();
var bus = require('bus.io')(io);
var server = require('http').createServer(function (reqres) {}).listen(function (err) {});
var bus = require('bus.io')(server)

Sets the function that will grab the actor. The default implementation will use the sock.id. This method is called when the socket connection is established.

bus.actor(function (sockcb) {
  cb(null, sock.id);
});

The callback cb takes two parameters err and actor.

You may pass an Error object for the first argument if you encounter an error or would like to trigger one.

bus.actor(function (sockcb) {
  sock.get('user', function (erruser) {
    if (err) return cb(err);
    if (!user) return cb(new Error('Need to login'));
    return cb(null, user.name);
  });
});

Gets the function that will grab the actor from a socket.

var actorFn = bus.actor();

Sets the function that will grab the target from the request. The default implementation will use the sock.id. This method is called for each request from the sock.

The client would emit this.

sock.emit('shout', 'hello', 'You');

We would like "You" to be the actor.

bus.target(function (sockparamscb) {
  cb(null, params.pop());
});

If you encounter an error you can also pass one along.

bus.target(function (sockparamscb) {
  if (params.length === 0) {
    cb(new Error('You are you talking to?!'));
  }
  else {
    cb(null, params.pop());
  }
});

You get to decide your own convention.

Gets the method that will grab the target from the request.

var targetFn = bus.target();

This method will allow you to bind a function to the connection event that socket.io supports.

e.g.

We would like to tell the client "Hello" when they connect.

bus.socket(function (sockbus) {
  sock.emit('greet', 'Hello');
});

With alias your actor will receive messages whenever their alias receives one. This is useful if you want to associate a socket to a logged in user.

bus.alias(sock, 'nathan');

A good place to do this is when the client is connected to the server.

bus.socket(function (sockbus) {
  sock.get('user', function (erruser) {
    if (err) return sock.emit('error', err);
    if (!user) return sock.emit('login', 'You must login');
    bus.alias(sock, user.name);
  });
});

With unalias your actor will no longer receive messages whenever their alias receives one. This is useful if you want to control messages going to your socket.

bus.unalias(sock, 'nathan');

An example is if you do not want to listen to messages from another user.

bus.in('stop following', function (msgsocknext) {
  bus.unalias(sock, msg.target());
  next();
});

The in method will use the passed function(s) when a message is received from the sock. This allows you to modify the message before it is sent to the exchange.

bus.in(function (msgsocknext) {
  msg.content([msg.content()[0].toLowerCase()]);
  next();
});

You can pass in multiple functions or arrays of functions.

bus.in(function (a,b,c) {...}, function (a,b,c) {...}, [function (a,b,c) {...}, function(a,b,c) {...}]);

You can set up handlers for specific messages.

bus.in('chat', function (msgsocknext) {
  // do something 
  next();
});

If you bind multiple handlers they will be called in this order

bus.in('chat', function (msgsocknext) {
  msg.content('A');
  next();
});
 
bus.in(function (msgsocknext) {
  msg.content(msg.content()+'B');
});
 
bus.in('chat', function (msgsocknext) {
  msg.content(msg.content()+'C');
  next();
});
 
bus.in(function (errmsgsocknext) {
  console.error(err);
  msg.errored(err);
});
 
// The output of msg.content() will be 'ABC'; 

You can control propagation with consume(), deliver(), respond(), errored() as well.

bus.in(function (msgsocknext) {
  msg.deliver();
});
 
bus.in(function (msgsocknext) {
  // will not be called because the message will delivered to the target as a result of calling deliver!! 
});
 
bus.in(function (msgsocknext) {
  msg.consume();
  // the message will just die here 
});

The on method binds a handler to the queue. The handler will process each message and give you the ability to either deliver the message or discard it. That is up to your application requirements.

bus.on('some event', function (msg) {
  msg.deliver();
});

Or you can use the optional next parameter. You may eiter call next() to invoke the next handler. Or you may call msg.deliver(), msg.respond(), or msg.consumed() to control the message's propagation.

bus.on('some*', function (msgnext) {
  // do something! 
  next();
});

The out method will use the passed function(s) when a message is received from the exchange. This allows you to modify the message before it is sent to the sock.

Here you could save the message to a mongo store using mongoose.

//assuming you have mongoose and a message model 
var Message = monngose.model('Message');
 
bus.out(function (msgsocknext) {
  new Message(msg.data).save(function (err) {
    if (err) return next(err);
    next();
  });
});

You can pass in multiple functions or arrays of functions.

bus.out(function (a,b,c) {...}, function (a,b,c) {...}, [function (a,b,c) {...}, function(a,b,c) {...}]);

You can set up handlers for specific messages.

bus.out('chat', function (msgsocknext) {
  // do something 
  next();
});

If you bind multiple handlers they will be called in this order

bus.out('chat', function (msgsocknext) {
  msg.content('A');
  next();
});
 
bus.out(function (msgsocknext) {
  msg.content(msg.content()+'B');
});
 
bus.out('chat', function (msgsocknext) {
  msg.content(msg.content()+'C');
  next();
});
 
bus.out(function (errmsgsocknext) {
  console.error(err);
  msg.errored(err);
});
 
assert.equal(msg.content(), 'ABC');

You can control propagation with consume(), deliver(), respond(), errored() as well.

bus.out(function (msgsocknext) {
  msg.deliver();
});
 
bus.out(function (msgsocknext) {
  // will not be called because the message will delivered to the target as a result of calling deliver!! 
});
 
bus.out(function (msgsocknext) {
  msg.consume();
  // the message will just die here 
});

You can either pass a port, server, or socket.io instance.

bus.listen(3000);
 
bus.listen(require('http').createServer(function (reqres) { }));
 
bus.listen(require('socket.io')());

This method is a convenient way to deliver a message.

bus.deliver({actor:'I', action:'say', content:'hello', 'you'});

This is an alias to message().

This will create you an object for building a message that you can deliver. The data can either be an object or an instanceof of Message.

bus.msg({
  actor:'I',
  action:'say',
  content:'hello'
  target:'you',
}).deliver();

A chain-able approach.

bus.msg()
  .actor('me')
  .action('say')
  .content('hello')
  .target('you')
  .deliver();

Simply put.

bus.msg()
  .i('me')
  .did('say')
  .what('hello')
  .to('you');

Gets the exchange the server uses to publish information.

See bus.io-exchange

var exchange = bus.exchange();

Sets the exchange the server uses to publish information.

See bus.io-exchange

for more information.

var exchange = require('bus.io').Exchange();
bus.exchange(exchange);

Gets the Queue the Exchange uses.

var queue = exchange.queue();

Sets the Queue the Exchange uses.

See bus.io-exchange

var queue = require('bus.io').Exchange.Queue();
bus.queue(queue);

Gets the PubSub the Exchange uses.

See bus.io-exchange

var pubsub = exchange.pubsub();

Sets the PubSub the Exchange uses.

var pubsub = require('bus.io').Exchange.PubSub();
bus.pubsub(pubsub);

Instead of having to write a function to deliver a message like this.

bus.on(/some message/, function (msg) {
  msg.deliver();
});

We could call autoPropagate(true) so that any method we have not declared a handler for will automatically be propagated.

bus.autoPropagate(true);

Auto-propagation is on by default. You may turn it off to prevent unwanted messages from going into your bus.

This method will pass the bus instance into your function. Your function can then do whatever it needs to attach it self to your bus.

In this example we create a middleware that will emit an event whenever we handle a message going in(), on(), or out() of the bus.

function tracker (emitter) {
  return function (bus) {
    function handler (event) {
      return function (msgnext) {
        emitter.emit('track', event, msg);
        next();
      }
    }
    bus.in(handler('in'));
    bus.on(handler('on'));
    bus.on(handler('out'));
  }
}
 
var receiver = new require('events').EventEmitter();
 
report.on('track', function (pointmsg) {
  this.data = this.data || {};
  this.data[point] = this.data[point] || {};
  this.data[point][msg.action()] = this.data[point][msg.action()] || 0;
  this.data[point][msg.action()] += 1;
});
 
report.on('report', function () {
  console.log(this.data);
});
 
setInterval(function () {
  report.emit('report');
}, 1000);
 
bus.use(tracker(report));

bus.io is broken down into other components.

  • bus.io-common contains all the common code such as the Message, Builder, and Controller classes.
  • bus.io-exchange contains all the code that will be used to handle messages going into the Queue and propagation on the PubSub.
  • bus.io-messsages contains the code that handles listening to a socket.io Socket for an event and building that into a message.
  • bus.io-receiver contains the code that handles receiving messages in, on, and out the bus.
  • bus.io-client is a wrapper for socket.io-client that provides the bus.io-common interface on the client.

Test drive your apps with bus.io-driver.

  • bus.io-driver The driver helps you test driver bus.io in your apps.

Middleware components you can use in your apps.

  • bus.io-monitor The monitor helps your bus.io apps report on the messages being processed.
  • bus.io-session The session is used to maintain state for socket connections and multiple servers.

Running Tests

Install coffee-script

> npm install coffee-script -g

Tests are run using grunt. You must first globally install the grunt-cli with npm.

> sudo npm install -g grunt-cli

To run the tests, just run grunt

> grunt spec

Working Examples and Demos

You will need a redis server up and running to run the demos at this time.

Check out the (README)(https://github.com/turbonetix/bus.io/master/examples "examples") for the examples

> node examples/echo.js

Demos are under the /demo directory. There is currently a basic chat program.

TODO

There are open issues if you would like to contribute please fork and send me a pull request!