    DynamoDB output stream

    DynamoDB output stream aims to cover as many operations possible via aws sdk DynamoDB module to be used with mutator-io.


    npm i mutator-io-plugin-out-dynamodb

    Ideally this should leverage Rx.js to perform fail-safe operations like batchWriteItem. This means that we can hide the whole logic of retrying failed calls (e.g. consuming UnprocessedItems returned from the standard BatchWriteItem call untill all of them are written)

    The configuration required is exactly the same of the original sdk.

    There's an extra custom parameter called IGNORE_ERRORS which is a list of error codes (and optionally message) that we might want to ignore to avoid bloating the logs.

    Here's an example:

    new DynamoDB({
        { code: 'ConditionalCheckFailedException' },
        { code: 'ValidationException', message: 'specific validation message' }
    } as DynamoDB.Config)

    The create method returns a function that accepts a custom Message type parameter.

    enum Operations {
      PUT = 'put',
      DELETE = 'delete'
      UPDATE = 'update'
    interface RetryDelay {
      (msg: any): Observable<number>
    interface Message {
      operation: Operations
      params: Object
      retry?: number
      retryDelay?: RetryDelay

    As long as the transformation returns an object shaped this way, this output stream will perform one of the Operations specified in the DynamoDB instance and will return the same message we've sent in the output if it succeeds (or fire the stream's error callback)

    Transformation example:

    import * as DynamoDBOutputStream from 'mutator-io-plugin-out-dynamodb'
    mutatorIOInstance.transform('myPipeName', (msg): DynamoDBOutputStream.Message => {
      const params = {
        TableName : 'test_table',
        Item: {
          id: msg.payload,
          NumAttribute: 1,
          BoolAttribute: true,
          ListAttribute: [1, 'two', false],
          MapAttribute: { foo: 'bar'},
          NullAttribute: null
      return {
        operation: outputStreams.DynamoDB.Operations.PUT,

    You can optionally specify a retry parameter, which will make the output stream retry the write operation N times in case of failure. retrDelay optional parameter should be a function returning an observable (e.g. (msg) => Rx.Observable.of(2000)) - this will be called every time an error comes in, allowing the user to set dynamic delays between retries (e.g. based on something in the message, or perform more complex async operations to determine the delay to apply)

