node package manager


Streaming SQL ORM

streamsql Build Status

A streaming, backend agnostic SQL ORM heavily inspired by levelup


$ npm install streamsql

You will also need to install either mysql or sqlite3 depending on which driver you plan on using:

# EITHER: mysql driver
$ npm install mysql
# OR: sqlite3 driver
$ npm install sqlite3





### base.connect(options)

Establish a database connection

options.driver can either be mysql or sqlite3.

Super Important Note

streamsql loads drivers on demand and does not include them as production dependencies. You will need to have either one mysql (tested against 2.0.0-alpha9) or sqlite3 (tested against 2.1.19) in your package.json in addition to streamsql.

mysql options

See the documentation for the mysql module for full details. The options object will be passed over to that.

const streamsql = require('streamsql')
const db = streamsql.connect({
  driver: 'mysql',
  user: process.env['DB_USER'],
  password: process.env['DB_PASSWORD'],
  database: 'music'

sqlite3 options

Takes just one option, opts.filename. This can be set to :memory: for an in-memory database.

const streamsql = require('streamsql')
const db = streamsql.connect({
  driver: 'sqlite3',
  filename: ':memory:',

Returns a db object

### db.table(localName, definition)

Registers a table against the internal table cache. Note, this does not create the table in the database (nor does it run any SQL at all).

localName is the name the table will be registered under. You can use this later with connection.table() to get a handle for the table.


  • primaryKey: the primary key for the table. Defaults to id

  • tableName: the name of the table in the actual database. Defaults to localName

  • fields: an array representing all the fields this table has. Example: ['id', 'first_name', 'last_name', 'created_at']

  • methods: (optional) methods to add to a row object as it gets emitted from the database (when using the default constructor). this in the function context will be a reference to the row. Example:

db.table('friendship', {
  fields: [ 'id', 'screen_name', 'friend' ],
  methods: {
    hifive: function hifive() {
      return this.screen_name + ' deserves a hifive!'
  • constructor: (optional) method to call when creating a row object as it gets emitted from the database. The default constructor should be sufficient for most scenarios, which returns the data combined with any given methods. Example:
function Friendship (data) { =
  this.screen_name = data.screen_name
  this.friend = data.friend
Friendship.prototype.hifive = function () {
  return this.screen_name + ' deserves a hifive!'
db.table('friendship', {
  fields: [ 'id', 'screen_name', 'friend' ],
  constructor: Friendship

#### options.relationships

You can define relationships on the data coming out createReadStream , get or getOne. hasOne relationships will translate to JOINs at the SQL layer, and hasMany will perform an additional query.

options.relationships is an object, keyed by property. The property name will be used when attaching the foreign rows to the main row.

  • type: Either "hasOne" or "hasMany".
  • foreign: Definition for the right side of the join.
    • table: The name of the table. This should be the name you used to register the table with db.table.
    • as: How to alias the table when performing the join. This is mostly useful when doing a self-join on a table so you don't get an ambiguity error. Defaults to the name of the table.
    • key: The foreign key to use.
  • local: Definition for the left side of the join. If you're just joining on a key normally found in the current table, this can be a string. If you are doing a cascading join (i.e., joining against a field acquired from a different join) you can use an object here:
    • table: The name of the table. Important if you aliased the table with as, use the alias here.
    • key: Key to use
  • via: Used for many-to-many relationships, where a third table is required to maintain data associations:
    • table: The name of the linking table, as registered with db.table.
    • local: The key in the linking table associated with the local table.
    • foreign: The key in the linking table associated with the foreign table.
  • optional: Whether or not the relationship is optional (INNER vs LEFT join). Defaults to false.

The results of the fulfilled relationship will be attached to the main row by their key in the relationships object. All foreign items will have their methods as you defined them when setting up the table with db.table, or use their configured constructor where applicable.


band table

id | name          | founded | disbanded
 1 | Squirrel Bait |    1983 |      1988
 2 | Slint         |    1986 |      1992

album table

id | bandId | name          | released
 1 |      1 | Squirrel Bait |     1985
 2 |      1 | Skag Heaven   |     1987
 3 |      2 | Tweez         |     1989
 4 |      2 | Spiderland    |     1991

member table

id | firstName | lastName
 1 | Brian     | McMahon
 2 | David     | Pajo
 3 | Todd      | Brashear
 4 | Britt     | Walford

bandMember table

id | bandId | memberId
 1 |      1 |        1
 2 |      1 |        4
 3 |      2 |        1
 4 |      2 |        2
 5 |      2 |        3
 6 |      2 |        4
const band = db.table('band', {
  fields: [ 'name', 'founded', 'disbanded' ],
  relationships: {
    albums: {
      type: 'hasMany',
      local: 'id',
      foreign: { table: 'album', key: 'bandId' }
    members: {
      type: 'hasMany',
      local: 'id',
      foreign: { table: 'member', key: 'id' },
      via: { table: 'bandMember', local: 'bandId', foreign: 'memberId' }
const album = db.table('album', {
  fields: [ 'bandId', 'name', 'released' ]
const member = db.table('member', {
  fields: [ 'firstName', 'lastName' ],
  relationships: {
    bands: {
      type: 'hasMany',
      local: 'id',
      foreign: { table: 'band', key: 'id' },
      via: { table: 'bandMember', local: 'memberId', foreign: 'bandId' }
const bandMember = db.table('bandMember', {
  fields: [ 'bandId', 'memberId' ]
// NOTE: for efficiency, relationships are not automatically populated. 
// You must pass { relationships: `true` } to fulfill the relationships 
// defined on the table at time of `get` or `createReadStream` 
band.get({}, {
  debug: true,
  relationships: true
}, function (err, rows) {

Will result in:

[ { id: 1,
    name: 'Squirrel Bait',
    founded: 1983,
    disbanded: 1988,
     [ { id: 1, bandId: 1, name: 'Squirrel Bait', released: 1985 },
       { id: 2, bandId: 1, name: 'Skag Heaven', released: 1987 } ],
     [ { id: 1, firstName: 'Brian', lastName: 'McMahon' },
       { id: 4, firstName: 'Britt', lastName: 'Walford' } ] },
  { id: 2,
    name: 'Slint',
    founded: 1986,
    disbanded: 1992,
     [ { id: 3, bandId: 2, name: 'Tweez', released: 1989 },
       { id: 4, bandId: 2, name: 'Spiderland', released: 1991 } ],
     [ { id: 1, firstName: 'Brian', lastName: 'McMahon' },
       { id: 2, firstName: 'David', lastName: 'Pajo' },
       { id: 3, firstName: 'Todd', lastName: 'Brashear' },
       { id: 4, firstName: 'Britt', lastName: 'Walford' } ] } ]

Returns a table object.

### db.table(localName)

Return a previously registered table. If the table is not in the internal cache, db.table will throw an error.

Returns a table object.

### table.put(row, [options, [callback]])

Inserts or updates a single row. If callback is not provided, returns a promise.

An insert will always be attempted first. If the insert fails with an duplicate entry error (as tested by the specific driver implementation) and the row contains the table's primaryKey, an update will be attempted

callback will receive two arguments: err, result. Result should have three properties, row, sql, and insertId. This behavior can be changed with the uniqueKey option, see below.

If the result of a put() is an update, the result will have affectedRows instead of insertId.

#### options

  • uniqueKey: This option changes the way a put() turns into an update(). Instead of checking for a duplicate primary key error, it will check for duplicate unique key errors. For example, if you have a table with UNIQUE KEY (firstName,lastName), you can do:
      firstName: 'John',
      lastName: 'Doe',
      age: 33,
    }, {uniqueKey: ['John', 'Doe']})

### table.get(conditions, [options, [callback]]) ### table.getOne(conditions, [options, [callback]]) ### table.getAll([options, [callback]])

Gets all, some, or one row from the table. If callback is not provided omitted, returns a promise.


// Get all rows, promise style... 
const getAlbums = albums.getAll()
  // do stuff with albums 
  // handle errors 
// ...or use callback style 
albums.getAll(function(error, albums){
  if (error) {
    // handle errors 
  // do stuff with albums 

#### conditions

conditions can be in any number of forms depending on how you're trying to select things

Simple, uses = for comparison:

albums.get({ artist: 'Hookworms' }, function(err, rows){ ... })

Explicit comparison operation:

  artist: {
    value: 'Hookworms',
    operation: '=',
  release_year: {
    operation: '<=',
    value: 2012
}, function(err, rows){ ... })

Implicit in comparison:

  artist: [
    'My Bloody Valentine',
}, function(err, rows){ ... })

Multiple conditions on a single column:

  artist: 'David Bowie',
  release_year: [{
    operation: '>=',
    value: 1976
  }, {
    operation: '<='
    value: 1978
}, function(err, rows){ ... })

**Simple OR queries

All of the examples above are inclusive – the where statement is joined with AND – so the row must match all of the parameters to be included. However, by passing in multiple conditions in an array, it is possible to generate an OR query.

   artist: ['Queen', 'Pink Floyd'],
   release_year: 1975
   artist: ['Electric Light Orchestra', 'Led Zeppelin'],
   release_year: 1973
], function(err, rows){ ... })

Raw sql

There's a final option that lets you do whatever you want. Note, you can use $table as a placeholder for the current table so you don't have to hardcode it.

  'SELECT `release_date` AS `date` FROM $table WHERE `title` = ? AND `artist`= ?',
  ['Siamese Dream', 'The Smashing Pumpkins']
], function(err, rows){ ... })

#### options

  • include: Rows to select from the database. Any rows not in this list will not be included. Note, the primary key will always be included. By default, everything listed in table.fields will be included.
  • exclude: Rows in this list will not be selected from the database. If both include and exclude are defined, include is always preferred
  • relationships: Either boolean or a set of relationship definition.
  • relationshipsDepth: Depth of relationships to fulfil; the default is 1 - that is, only the relationships of the requested object are returned. -1 will attempt to retrieve as many relationships as is reasonably possible.
  • sort: Can be one of three forms:
    • Implicit ascending, single column: {sort: 'artist'}
    • Implicit ascending, multiple rows: {sort: ['artist', 'release_date']
    • Explicit: {sort: { artist: 'desc', release_date: 'asc'}}
  • limit and page: How many rows and which page of results to get. Example: {limit: 25, page: 3}
  • includeTotal: Instead of returning an array of rows, return an object that includes rows and total, which represents the total amount of rows. This is useful when using limit and page.
  • debug: When set to true, the generated SQL statement will be printed to stderr.

### table.del(conditions, options, [callback])

Deletes rows from the database. If callback is omitted, returns a promise.

Be careful – you can truncate an entire table with this command.

garbage.del({}, function(err){
  // garbage is now empty. 
  • conditions: see above
  • options:
    • limit: maximum number of rows to delete

### table.createReadStream(conditions, options)

Create a ReadStream for the table.

pause() and resume()

pause() and resume() will attempt to operate on the underlying connection when applicable, such as with the mysql driver)


  • data: Receives one argument, row.
  • error: If there is an error, it will be emitted here.
  • end: When the stream is complete.

### table.createKeyStream(conditions)

Emits a data event for each row with just the primary key of that row.

See above for definition of conditions

### table.createWriteStream(options)

Creates a WriteStream to the table.

The write() method on the stream takes row data. When a row is successfully written, a meta event is emitted and passed a meta object containing row, sql and insertId

An internal buffer is not kept, so all calls to write()s will return false to signal a ReadStream to pause(). Once a row has been succesfully stored, a drain event will be emitted.

If options.ignoreDupes, any duplicate key errors will be ignored instead of emitting an error event. Ignored rows will be emitted as dupe events.


const ws = band.createWriteStream()
ws.on('error', function (err) {
  console.log('Oh my!', err)
ws.on('close', function () {
ws.write({ name: 'Brian', instrument: 'bass', food: 'burritos' })
ws.write({ name: 'Jeremy', instrument: 'drums', food: 'cheese' })
ws.write({ name: 'Travis', instrument: 'tambourine', food: 'tofu' })



Copyright (c) 2013 Brian J. Brennan
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.