@bunchtogether/chunked-stream-transformers

1.0.2 • Public • Published

Chunked Stream Transformers

CircleCI npm version

SerializeTransform transforms large binary chunks into small chunks limited to a maximum size.

DeserializeTransform assembles the small chunks to recreate the original large chunks.

The protocol implementation is conceptually similar to Real-time Transport Protocol (RTP).

Useful for distributed systems where messages may arrive out of order.

Extends the Node.js Transform and can be used with any stream. No dependencies.

If you encounter an issue, fork the repository, write tests demonstrating the issue, and create a pull request.

const crypto = require('crypto');
const { 
  SerializeTransform, 
  DeserializeTransform, 
  ChunkTimeoutError, 
  ChunkIncompleteError 
} = require('@bunchtogether/chunked-stream-transformers');

const serializeTransform = new SerializeTransform({
  maxChunkSize: 1024 // bytes
});

const deserializeTransform = new DeserializeTransform({
  timeout: 1000 // ms
});

// 10 MB buffer
const originalBuffer = crypto.randomBytes(10 * 1024 * 1024);

// The serialize transform will chunk to "maxChunkSize"
// inclusive of a 12 byte header used for chunk reordering
serializeTransform.on('data', (chunk) => {
  // Chunks may be sent out of order
  setTimeout(() => {
    deserializeTransform.write(chunk);
  }, Math.random() * 100);
});

// The deserialize transform will reorder the chunks
// and recreate the original buffer
deserializeTransform.on('data', (receivedBuffer) => {
  originalBuffer.equals(receivedBuffer); // true
});

serializeTransform.write(originalBuffer);

deserializeTransform.onIdle().then(() => {
  // Resolves immediately if no writes are active
  // or when all writes are complete
});

deserializeTransform.onActive().then(() => {
  // Resolves immediately if writes are active
  // or when a write begins
});


deserializeTransform.on('active', () => {
  // Active event is emitted when all chunks start  
});

// Idle event is emitted when all chunks have been completed
deserializeTransform.on('idle', () => {
  deserializeTransform.end(); // Safe to close the stream
});

deserializeTransform.on('error', (error) => {
  if(error instanceof ChunkTimeoutError) {
    // One of the writes timed out
  } else if(error instanceof ChunkIncompleteError) {
    // The transform was closed while a write was in progress
  }
});

Install

yarn add @bunchtogether/chunked-stream-transformers

API

Table of Contents

ChunkTimeoutError

Extends Error

Emitted by DeserializeTransform streams when the time after the last byte in a chunk received exeeds the 'timeout' parameter

ChunkIncompleteError

Extends Error

Emitted by DeserializeTransform streams that are ended while chunks are in progress

SerializeTransform

Extends Transform

Ingests data of any size, emits consistently sized chunks containing a 12 byte header used by DeserializeTransform to reconstruct the original stream

Parameters

  • options Object Transform stream options, see Node.js documentation for full documentation (optional, default {maxChunkSize:1316})
    • options.maxChunkSize number Maximum size in bytes of emitted chunks, including a 12 byte header. (optional, default 1316)

DeserializeTransform

Extends Transform

Ingests consistently sized chunks generated by SerializeTransform and emits the original, larger chunks

Parameters

  • options Object Transform stream options, see Node.js documentation for full documentation (optional, default {timeout:5000})
    • options.timeout number Maximum size in bytes of emitted chunks, including a 12 byte header. (optional, default 5000)

bytesRemaining

Bytes remaining from active chunks

Type: number

Returns number

onIdle

Resolves when all chunks have completed

Returns Promise<void>

onActive

Resolves when chunks are initially received

Returns Promise<void>

Dependencies (0)

    Dev Dependencies (23)

    Package Sidebar

    Install

    npm i @bunchtogether/chunked-stream-transformers

    Weekly Downloads

    0

    Version

    1.0.2

    License

    MIT

    Unpacked Size

    322 kB

    Total Files

    32

    Last publish

    Collaborators

    • bunchtogether