Fluent octet-stream handling

Fluent Stream

Parsing octet-data from stream/buffer can be painful, this is what fluent-stream aiming for.

var fluent = require('fluent-stream').create()
    // pop 2 bytes 
    .pop(2, function (buf) { 
        console.log(buf)        // log: <Buffer 01 02> 
    // async pop 
    .pop(1, function (bufdone) { 
        setTimeout(function () {
            console.log(buf)    // log: <Buffer 03> 
        }, 100)
    // unpredictable length 
    .pop(1, function (buf) {
        console.log(buf)        // log: <Buffer 04> 
        return 2 // next pop length 
    .pop(function (bufdone) { // 2 bytes 
        console.log(buf)        // log: <Buffer 05 06> 
        done(2) // next pop length 
    .pop(function (buf) {
        console.log(buf)        // log: <Buffer 07 08> 
    // remaining data will be emited(Stream) 
    .on('data', function (buf) {
        console.log(buf)        // log: <Buffer 09> 
// write or pipe data to it 
f.write(new Buffer([0x1, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09]))
$ npm install fluent-stream
var FluentStream = require('fluent-stream')
  • buffer <Buffer>

Return a fluent-stream object, it is a duplex stream

  • nbytes <Number> : default 1
  • callback(buffer, [done]) <Function>

Pop n length buffer and exec callback when fluent received enough bytes(>= nbytes)

Discard those data if callback not exists

If callback has more than 1 params, it will be treat as a async callback, you must call done(2nd param) to finish this pop.

If callback return a Number(sync: return n, async: done(n)), it indicate that next pop action will pop n bytes. This is very useful in handling unpredictable length buffer.

There is a 'stash' prop in fluent, it is a Object, all stashed buffers will be stored in it.

  • nbytes <Number> : default 1
  • stashName <String>

Pop buffer and stash it in 'stash' keyed 'stashName'.

  • callback(stash) <Function>

Apply stash, you can accese stashed buffers in it, such as console.log(stash.stashName)

There are a dozen of methods can pre-parse buffer to corresponding types, such as int, float, double

fluent.readInt8(function (value) {
    console.log(value) //log: 1 
fluent.write(new Buffer([0x01]))

They are…

  • readUInt8
  • readUInt16LE
  • readUInt16BE
  • readUInt32LE
  • readUInt32BE
  • readInt8
  • readInt16LE
  • readInt16BE
  • readInt32LE
  • readInt32BE
  • readFloatLE
  • readFloatBE
  • readDoubleLE
  • readDoubleBE

Details see here Node Buffer Doc

  • callback(value, [done]) <Function>
  • stashName <String>

FluentStream is duplex stream implemented, you can pipe readable stream to it and pipe it to other writable stream.

Fluent will pop buffer of specified length then emit remaining buffer.


var http = require('http')
  , FluentStream = require('fluent-stream')
http.createServer(function (repres) {
        .pop(1, function (buf) {
            // do something...