@bunchtogether/chunked-stream-transformers

1.0.2 • Public • Published

Chunked Stream Transformers

CircleCI npm version

SerializeTransform transforms large binary chunks into small chunks limited to a maximum size.

DeserializeTransform assembles the small chunks to recreate the original large chunks.

The protocol implementation is conceptually similar to Real-time Transport Protocol (RTP).

Useful for distributed systems where messages may arrive out of order.

Extends the Node.js Transform and can be used with any stream. No dependencies.

If you encounter an issue, fork the repository, write tests demonstrating the issue, and create a pull request.

const crypto = require('crypto');
const { 
  SerializeTransform, 
  DeserializeTransform, 
  ChunkTimeoutError, 
  ChunkIncompleteError 
} = require('@bunchtogether/chunked-stream-transformers');

const serializeTransform = new SerializeTransform({
  maxChunkSize: 1024 // bytes
});

const deserializeTransform = new DeserializeTransform({
  timeout: 1000 // ms
});

// 10 MB buffer
const originalBuffer = crypto.randomBytes(10 * 1024 * 1024);

// The serialize transform will chunk to "maxChunkSize"
// inclusive of a 12 byte header used for chunk reordering
serializeTransform.on('data', (chunk) => {
  // Chunks may be sent out of order
  setTimeout(() => {
    deserializeTransform.write(chunk);
  }, Math.random() * 100);
});

// The deserialize transform will reorder the chunks
// and recreate the original buffer
deserializeTransform.on('data', (receivedBuffer) => {
  originalBuffer.equals(receivedBuffer); // true
});

serializeTransform.write(originalBuffer);

deserializeTransform.onIdle().then(() => {
  // Resolves immediately if no writes are active
  // or when all writes are complete
});

deserializeTransform.onActive().then(() => {
  // Resolves immediately if writes are active
  // or when a write begins
});


deserializeTransform.on('active', () => {
  // Active event is emitted when all chunks start  
});

// Idle event is emitted when all chunks have been completed
deserializeTransform.on('idle', () => {
  deserializeTransform.end(); // Safe to close the stream
});

deserializeTransform.on('error', (error) => {
  if(error instanceof ChunkTimeoutError) {
    // One of the writes timed out
  } else if(error instanceof ChunkIncompleteError) {
    // The transform was closed while a write was in progress
  }
});

Install

yarn add @bunchtogether/chunked-stream-transformers

API

Table of Contents

ChunkTimeoutError

Extends Error

Emitted by DeserializeTransform streams when the time after the last byte in a chunk received exeeds the 'timeout' parameter

ChunkIncompleteError

Extends Error

Emitted by DeserializeTransform streams that are ended while chunks are in progress

SerializeTransform

Extends Transform

Ingests data of any size, emits consistently sized chunks containing a 12 byte header used by DeserializeTransform to reconstruct the original stream

Parameters

  • options Object Transform stream options, see Node.js documentation for full documentation (optional, default {maxChunkSize:1316})
    • options.maxChunkSize number Maximum size in bytes of emitted chunks, including a 12 byte header. (optional, default 1316)

DeserializeTransform

Extends Transform

Ingests consistently sized chunks generated by SerializeTransform and emits the original, larger chunks

Parameters

  • options Object Transform stream options, see Node.js documentation for full documentation (optional, default {timeout:5000})
    • options.timeout number Maximum size in bytes of emitted chunks, including a 12 byte header. (optional, default 5000)

bytesRemaining

Bytes remaining from active chunks

Type: number

Returns number

onIdle

Resolves when all chunks have completed

Returns Promise<void>

onActive

Resolves when chunks are initially received

Returns Promise<void>

Readme

Keywords

none

Package Sidebar

Install

npm i @bunchtogether/chunked-stream-transformers

Weekly Downloads

12

Version

1.0.2

License

MIT

Unpacked Size

322 kB

Total Files

32

Last publish

Collaborators

  • bunchtogether