@bunchtogether/chunked-stream-transformers

    1.0.2 • Public • Published

    Chunked Stream Transformers

    CircleCI npm version

    SerializeTransform transforms large binary chunks into small chunks limited to a maximum size.

    DeserializeTransform assembles the small chunks to recreate the original large chunks.

    The protocol implementation is conceptually similar to Real-time Transport Protocol (RTP).

    Useful for distributed systems where messages may arrive out of order.

    Extends the Node.js Transform and can be used with any stream. No dependencies.

    If you encounter an issue, fork the repository, write tests demonstrating the issue, and create a pull request.

    const crypto = require('crypto');
    const { 
      SerializeTransform, 
      DeserializeTransform, 
      ChunkTimeoutError, 
      ChunkIncompleteError 
    } = require('@bunchtogether/chunked-stream-transformers');
    
    const serializeTransform = new SerializeTransform({
      maxChunkSize: 1024 // bytes
    });
    
    const deserializeTransform = new DeserializeTransform({
      timeout: 1000 // ms
    });
    
    // 10 MB buffer
    const originalBuffer = crypto.randomBytes(10 * 1024 * 1024);
    
    // The serialize transform will chunk to "maxChunkSize"
    // inclusive of a 12 byte header used for chunk reordering
    serializeTransform.on('data', (chunk) => {
      // Chunks may be sent out of order
      setTimeout(() => {
        deserializeTransform.write(chunk);
      }, Math.random() * 100);
    });
    
    // The deserialize transform will reorder the chunks
    // and recreate the original buffer
    deserializeTransform.on('data', (receivedBuffer) => {
      originalBuffer.equals(receivedBuffer); // true
    });
    
    serializeTransform.write(originalBuffer);
    
    deserializeTransform.onIdle().then(() => {
      // Resolves immediately if no writes are active
      // or when all writes are complete
    });
    
    deserializeTransform.onActive().then(() => {
      // Resolves immediately if writes are active
      // or when a write begins
    });
    
    
    deserializeTransform.on('active', () => {
      // Active event is emitted when all chunks start  
    });
    
    // Idle event is emitted when all chunks have been completed
    deserializeTransform.on('idle', () => {
      deserializeTransform.end(); // Safe to close the stream
    });
    
    deserializeTransform.on('error', (error) => {
      if(error instanceof ChunkTimeoutError) {
        // One of the writes timed out
      } else if(error instanceof ChunkIncompleteError) {
        // The transform was closed while a write was in progress
      }
    });

    Install

    yarn add @bunchtogether/chunked-stream-transformers

    API

    Table of Contents

    ChunkTimeoutError

    Extends Error

    Emitted by DeserializeTransform streams when the time after the last byte in a chunk received exeeds the 'timeout' parameter

    ChunkIncompleteError

    Extends Error

    Emitted by DeserializeTransform streams that are ended while chunks are in progress

    SerializeTransform

    Extends Transform

    Ingests data of any size, emits consistently sized chunks containing a 12 byte header used by DeserializeTransform to reconstruct the original stream

    Parameters

    • options Object Transform stream options, see Node.js documentation for full documentation (optional, default {maxChunkSize:1316})
      • options.maxChunkSize number Maximum size in bytes of emitted chunks, including a 12 byte header. (optional, default 1316)

    DeserializeTransform

    Extends Transform

    Ingests consistently sized chunks generated by SerializeTransform and emits the original, larger chunks

    Parameters

    • options Object Transform stream options, see Node.js documentation for full documentation (optional, default {timeout:5000})
      • options.timeout number Maximum size in bytes of emitted chunks, including a 12 byte header. (optional, default 5000)

    bytesRemaining

    Bytes remaining from active chunks

    Type: number

    Returns number

    onIdle

    Resolves when all chunks have completed

    Returns Promise<void>

    onActive

    Resolves when chunks are initially received

    Returns Promise<void>

    Keywords

    none

    Install

    npm i @bunchtogether/chunked-stream-transformers

    DownloadsWeekly Downloads

    15

    Version

    1.0.2

    License

    MIT

    Unpacked Size

    322 kB

    Total Files

    32

    Last publish

    Collaborators

    • bunchtogether