node-cassandra-cql

0.4.4 • Public • Published

## Node.js CQL Driver for Apache Cassandra

Node.js CQL driver for Apache Cassandra with a small dependency tree written in pure javascript.

The driver uses Cassandra's binary protocol which was introduced in Cassandra version 1.2.

Installation

$ npm install node-cassandra-cql

Build Status

Features

  • Connection pooling to multiple hosts
  • Load balancing and automatic failover
  • Plain Old Javascript: no need to generate thrift files
  • Long and uuid support
  • Row and field streaming

Using it

//Creating a new connection pool to multiple hosts.
var cql = require('node-cassandra-cql');
var client = new cql.Client({hosts: ['host1:9042', 'host2:9042'], keyspace: 'keyspace1'});
//Reading
client.execute('SELECT key, email, last_name FROM user_profiles WHERE key=?', ['jbay'],
  function(err, result) {
    if (err) console.log('execute failed');
    else console.log('got user profile with email ' + result.rows[0].email);
  }
);
 
//Writing
client.execute('UPDATE user_profiles SET birth=? WHERE key=?', [new Date(1950, 5, 1), 'jbay'],
  cql.types.consistencies.quorum,
  function(err) {
    if (err) console.log('failure');
    else console.log('success');
  }
);
 
//Streaming query rows
client.eachRow('SELECT event_time, temperature FROM temperature WHERE station_id=', ['abc'],
  function(n, row) {
    //the callback will be invoked per each row as soon as they are received
    console.log('temperature value', n, row.temperature);
  },
  function (err, rowLength) {
    if (err) console.log('Oh dear...');
    console.log('%d rows where returned', rowLength);
  }
);
 
//Streaming field
client.streamField('SELECT key, photo FROM user_profiles WHERE key=', ['jbay'], 
  function(err, row, photoStream) {
    //the callback will be invoked per each row as soon as they are received.
    if (err) console.log('Shame...');
    else {
      //The stream is a Readable Stream2 object
      stdout.pipe(photoStream);
    }
  }
);
 
//The whole result set as a stream
client.stream('SELECT time1, value1 FROM timeseries WHERE key=', ['key123'])
  .on('readable', function () {
    //readable is emitted as soon a row is received and parsed
    var row;
    while (row = this.read()) {
      console.log('time %s and value %s', row.time1, row.value1);
    }
  })
  .on('end', function () {
    //stream ended, there aren't any more rows
  })
  .on('error', function (err) {
    //Something went wrong: err is a response error from Cassandra
  });

API

Client

The Client maintains a pool of opened connections to the hosts to avoid several time-consuming steps that are involved with the setup of a CQL binary protocol connection (socket connection, startup message, authentication, ...).

The Client is the recommended driver class to interact with Cassandra nodes.

new Client(options)

Constructs a new client object.

options is an object with these slots, only hosts is required:

                hosts: Array of string in host:port format. Port is optional (default 9042).
             keyspace: Name of keyspace to use.
             username: User for authentication.
             password: Password for authentication.
            staleTime: Time in milliseconds before trying to reconnect to a node.
    maxExecuteRetries: Maximum amount of times an execute can be retried
                       using another connection, in case the server is unhealthy.
getAConnectionTimeout: Maximum time in milliseconds to wait for a connection from the pool.
             poolSize: Number of connections to open for each host (default 1)

client.connect([callback])

Connects / warms up the pool.

It ensures the pool is connected. It is not required to call it, internally the driver will call to connect when executing a query.

The optional callback parameter will be executed when the pool is connected. If the pool is already connected, it will be called instantly.

client.execute(query, [params], [consistency], callback)

Executes a CQL query.

The query is the cql query to execute, with ? placeholders as parameters.

Use one of the values defined in types.consistencies for consistency, defaults to quorum.

Callback should take two arguments err and result.

The driver will replace the placeholders with the params, stringified into the query.

client.executeAsPrepared(query, [params], [consistency], callback)

Prepares (the first time) and executes the prepared query.

To execute a prepared query, the params are binary serialized. Using prepared statements increases performance, especially for repeated queries.

In the case the query is already being prepared on a host, it queues the executing of a prepared statement on that host until the preparing finished (the driver will not issue a request to prepare statement more than once).

Use one of the values defined in types.consistencies for consistency, defaults to quorum.

Callback should take two arguments err and result.

client.eachRow(query, [params], [consistency], rowCallback, endCallback)

Prepares (the first time), executes the prepared query and streams the rows as soon as they are received.

It executes rowCallback(n, row) per each row received, where n is the index of the row.

It executes endCallback(err, rowLength) when all rows have been received or there is an error retrieving the row.

Use one of the values defined in types.consistencies for consistency, defaults to quorum.

client.streamField(query, [params], [consistency], rowCallback, [endCallback])

Prepares (the first time), executes the prepared query and streams the last field of each row.

It executes rowCallback(n, row, streamField) per each row as soon as the first chunk of the last field is received, where n is the index of the row.

The stream is a Readable Streams2 object that contains the raw bytes of the field value. It can be piped downstream and provides automatic pause/resume logic (it buffers when not read).

The row object is similar to the one provided on eachRow, except that it does not contain the definition of the last column.

Use one of the values defined in types.consistencies for consistency, defaults to quorum.

It executes endCallback(err, rowLength) when all rows have been received or there is an error retrieving the row.

client.stream(query, [params], [consistency], [callback])

Returns a Readable Streams2 object in objectMode. When a row can be read from the stream, it will emit a readable event. It can be piped downstream and provides automatic pause/resume logic (it buffers when not read).

Prepares (the first time), executes the prepared query.

Use one of the values defined in types.consistencies for consistency, defaults to quorum.

It executes callback(err) when all rows have been received or there is an error retrieving the row.

client.shutdown([callback])

Disconnects the pool.

Closes all connections in the pool. Normally, it should be called once in your application lifetime.

The optional callback parameter will be executed when the pool is disconnected.


Connection

In the case that you need lower level fine-grained control you could use the Connection class.

It represents a connection to a Cassandra node. The consumer has to take care of open and close it.

new Connection(options)

Constructs a new connection object.

open(callback)

Establishes a connection, authenticates and sets a keyspace.

close(callback)

Closes the connection to a Cassandra node.

execute(query, args, consistency, callback)

Executes a CQL query.

prepare(query, callback)

Prepares a CQL query.

executePrepared(queryId, args, consistency, callback)

Executes a previously prepared query (determined by the queryId).


types

The types module contains field definitions that are useful to interact with Cassandra nodes.

consistencies

Object that contains the CQL consistencies defined as properties. For example: consistencies.one, consistencies.quorum, ...

dataTypes

Object that contains all the CQL data types defined as properties.

responseErrorCodes

Object containing all the possible response error codes returned by Cassandra defined as properties.

Long()

Constructs a 64-bit two's-complement integer. See Long API Documentation.

timeuuid()

Function to generate a uuid v1. It uses node-uuid module to generate and accepts the same arguments.

uuid()

Function to generate a uuid v4. It uses node-uuid module to generate and accepts the same arguments.


Logging

Instances of Client() and Connection() are EventEmitter's and emit log events:

client.on('log', function(level, message) {
  console.log('log event: %s -- %j', level, message);
});

The level being passed to the listener can be info or error.

Data types

Cassandra's bigint data types are parsed as Long.

List / Set datatypes are encoded from / decoded to Javascript Arrays.

Map datatype are encoded from / decoded to Javascript objects with keys as props.

Decimal and Varint are not parsed yet, they are yielded as byte Buffers.

Check the documentation for data type support →

FAQ

How can specify the target data type of a query parameter?

The driver tries to guess the target data type, if you want to set the target data type use a param object with the hint and value properties.

All the cassandra data types are defined in the object types.dataTypes.

For example:

//hint as string
var keyParam = {value: key, hint: 'int'};
client.executeAsPrepared('SELECT * from users where k=?', [keyParam], callback);
 
//hint using dataTypes
var keyParam = {value: key, hint: types.dataTypes.int};
client.executeAsPrepared('SELECT * from users where k=?', [keyParam], callback);

Should I shutdown the pool after executing a query?

No, you should only call client.shutdown once in your application lifetime.

License

node-cassandra-cql is distributed under the MIT license.

Contributions

Feel free to join in to help this project grow!

Check the Issue tracker, there are issues even marked "New Contributors Welcome" :)

Acknowledgements

FrameReader and FrameWriter are based on node-cql3's FrameBuilder and FrameParser.

Package Sidebar

Install

npm i node-cassandra-cql

Weekly Downloads

194

Version

0.4.4

License

none

Last publish

Collaborators

  • jorgebay