node package manager
Don’t reinvent the wheel. Reuse code within your team. Create a free org »

datapumps

Datapumps: Simple ETL for node.js

Travis CI Badge

Overview

Use pumps to import, export, transform or transfer data. A data pump will read from its input stream, array or datapumps Buffer and will write to its output buffers. A pump will finish when all data is consumed from its output buffers. Make a group of pumps to handle complex ETL tasks.

Installation

$ npm install datapumps --save

Usage example: export mongodb to excel

var
  datapumps = require('datapumps'),
  Pump = datapumps.Pump,
  MongodbMixin = datapumps.mixin.MongodbMixin,
  ExcelWriterMixin = datapumps.mixin.ExcelWriterMixin,
  pump = new Pump();
 
pump
  .mixin(MongodbMixin('mongodb://localhost/marketing'))
  .useCollection('Contact')
  .from(pump.find({ country: "US" }))
 
  .mixin(ExcelWriterMixin())
  .createWorkbook('/tmp/ContactsInUs.xlsx')
  .createWorksheet('Contacts')
  .writeHeaders(['Name', 'Email'])
 
  .process(function(contact) {
    return pump.writeRow([ contact.name, contact.email ]);
  })
  .logErrorsToConsole()
  .run()
    .then(function() {
      console.log("Done writing contacts to file");
    });

Usage example with more details:

  • First, we create a pump and setup reading from mongodb

    var pump = new Pump();
    pump
      .mixin(MongodbMixin('mongodb://localhost/marketing'))
      .useCollection('Contact')
      .from(pump.find({ country: "US" }))

    Mixins extend the functionality of a pump. The MongodbMixin adds .find() method which executes a query on the collection specified with .useCollection() method. The pump will read the query results and controls data flow, i.e. it pauses read when it cannot write excel rows.

  • Write data to excel with ExcelWriterMixin:

    pump
      .mixin(ExcelWriterMixin())
      .createWorkbook('/tmp/ContactsInUs.xlsx')
      .createWorksheet('Contacts')
      .writeHeaders(['Name', 'Email'])
     
      .process(function(contact) {
        return pump.writeRow([ contact.name, contact.email ]);
      })

    The excel workbook, worksheet and header rows are created after adding ExcelWriterMixin to the pump. Each pump has a .process() callback that may transform or filter data. The callback is called for every data item of the buffer and should return a promise (we use bluebird library) that fulfills when the data is processed. In this example, the default processing callback (which copies data to the output buffer by default) is overridden with writing rows to the excel worksheet.

  • Finally, start the pump and write to console when it's done.

    pump
      .logErrorsToConsole()
      .run()
        .then(function() {
          console.log("Done writing contacts to file");
        });

    The .logErrorsToConsole() will log any error to the console, surprisingly. The pump will start on calling .run(). It returns a promise that resolves when the pump finished.

Pump

A pump reads data from its input buffer or stream and copies it to the output buffer by default:

datapumps = require('datapumps');
(pump = new datapumps.Pump())
  .from(<put a nodejs stream or datapumps buffer here>)
  .run()

To access the output buffer, use the .buffer() method, which returns a Buffer instance:

buffer = pump.buffer('output');
buffer = pump.buffer(); // equivalent with previous as the default buffer 
                        // of the pump is called 'output' 

Use the .buffers() method when you need to write data into multiple output buffers:

ticketsPump
  .buffers({
    openTickets: ticketsPump.createBuffer(),
    closedTickets: ticketsPump.createBuffer(),
  });
 
reminderMailer = new datapumps.Pump()
reminderMailer
  .from(ticketPump.buffer('openTickets'))
  ...

Note that the ticketsPump pump has two output buffers: openTickets and closedTickets. The reminderMailer pump reads data from the openTickets buffer of the tickets pump.

Transforming data

Use the .process() method to set the function which processes data:

ticketsPump
  .process(function(ticket) {
    ticket.title = 'URGENT: ' + ticket.title;
    return this.buffer('openTickets').writeAsync(ticket);
  });

The argument of .process() is a function that will be executed after the pump reads a data item. The function is executed in the context of the pump object, i.e. this refers to the pump itself. The function should return a Promise that fulfills when the data is processed (i.e. written into a buffer or stored elsewhere).

Start and end of pumping

A pump is started by calling the .start() method. The end event will be emitted when the input stream or buffer ended and all output buffers became empty.

pump.on('end', function() {
  console.log('Pumped everything, and all my output buffers are empty. Bye.')
})

Pump group

You often need multiple pumps to complete an ETL task. Pump groups help starting multiple pump in one step, and also enables handling the event when every pump ended:

sendMails = datapumps.group();
sendMails.addPump('tickets')
  ...;
sendMails.addPump('reminderMailer')
  ...;
sendMails
  .start()
  .whenFinished().then(function() {
    console.log('Tickets processed.');
  });

The .addPump() method creates a new pump with given name and returns it for configuration. .start() will start all pumps in the group, while .whenFinished() returns a Promise the fulfills when every pump ended (Note: end event is also emitted).

Encapsulation

Sometimes you wish to encapsulate a part of an ETL process and also use it elsewhere. It is possible to set an input pump and expose buffers from the group, so it will provide the same interface as a simple pump (i.e. it has .from(), .start(), .buffer() methods and emits end event).

Most likely, you want to extend datapumps.Group class (example is written in CoffeeScript):

{ Groupmixin: { MysqlMixin } } = require 'datapumps'
 
class Notifier extends Group
  constructor: ->
    super()
    @addPump 'emailLookup'
      .mixin MysqlMixin connection
      .process (data) ->
        @query('SELECT email FROM user where username = ?'[ data.username ])
          .then (result) =>
            data.emailAddress = result.email
            @buffer().writeAsync data
    @addPump 'sendMail'
      .from @pump 'emailLookup'
      .process (data) ->
        ... # send email to data.emailAddress 
        @buffer().writeAsync
          recipient:
            name: data.name
            email: data.emailAddress
 
    @setInputPump 'emailLookup'
    @expose 'output''sendMail/output'

The Notifier will behave like pump, but in the inside, it does an email address lookup using mysql, and sends mail to those addresses. The output buffer of sendMail pump is filled with recipient data.

Use the created class like this:

etlProcess = datapumps.group()
etlProcess
  .addPump 'notifier'new Notifier
    .from <node stream or datapumps buffer>
 
etlProcess
  .addPump 'logger'
    .from etlProcess.pump('notifier').buffer()
    .process (data) ->
      console.log "Email sent to #{data.name} (#{data.email})"

Please note that you cannot use .process method on a group.

Error handling

Errors may occur while data is transferred between systems. Most of the time, you don't want to stop on the first error but complete the transfer and re-run after fixing problems. Therefore the pump group has an error buffer (.errorBuffer()) which can hold ten error messages by default. When the error buffer fills up, error event is triggered and .whenFinised() promise is rejected:

group
  .start()
  .whenFinished()
    .then(function() {
      if (!group.errorBuffer().isEmpty()) {
        console.log("Transfer finished, but with errors.");
        // errors list will be at group.errorBuffer().getContent() 
      }
    })
    .catch(function() {
      console.log("Pump group failed with errors");
      // errors list will be at group.errorBuffer().getContent() 
    });

You can use the .logErrorsToConsole() helper method will configure the pump or group to print errors when processing finished:

group
  .logErrorsToConsole()
  .start();

You can use the .logErrorsToLogger() helper method will configure the pump or group to print errors to a logger when processing finished:

group
  .logErrorsToLogger(logger)
  .start();

This is useful for running the ETL on a server. The logger can be any logging method that contains an .error() method such as Winston, Log4js, etc.

Debugging

The following example shows a fingers-crossed type logging, i.e. debug logging is turned on after the first error occured:

{ group } = require('datapumps')
 
(= group())
  .addPump 'test'
    .from d.createBuffer
      sealed: true,
      content: [ 'first''second''third''fourth' ]
    .process (data) ->
      throw new Error 'Start debugging'data if data == 'second'
      @copy data
 
d.errorBuffer().on 'write'(data) ->
  console.log data
  d.buffer('test/output').on 'write'(data) ->
    console.log "#{data} was written to test/output buffer"
 
d.start()

The output:

{ message: [Error: Start debugging], pump: 'test' }
third was written to test/output buffer
fourth was written to test/output buffer

Mixins

The core components of datapumps is only responsible for passing data in a flow-controlled manner. The features required for import, export or transfer is provided by mixins:

When you implement new mixins, please fork datapumps and make a pull request.