riak-pb

Riak Protocol Buffers Client

riak-pb

Riak Protocol Buffer Client for Node.js.

Features:

  • streaming
  • fail-over
  • automatic retry
  • connection pooling
  • load balancing

Include riak-pb in your dependency list or install:

$ npm install riak-pb
var riak = require('riak-pb');
var client = riak();

Or, with options:

var options = {
  nodes: [
    {
      host: 'myriakhostname.acme.com',
      port: 8087 }],
  maxPool: 5,     // Maximum number of connections in the connection pool - default is 5 
  maxRetries: 10, // maximum times the client tries to reconnect before failing - default is 10 
  maxDelay: 2000, // maximum time (ms) between reconnections - reconnections have an exponential backoff, but limited to this value - default is 2000 
};
 
var client = riak(options);

The API is based on the Riak Protocol spec, check it out to find out about what arguments you need.

Examples:

client.put({
    bucket: 'test',
    key: 'test',
    content: { 
      value: '{"test":"data"}',
      content_type: 'application/json',
      indexes: [{ key: 'test_bin', value: 'test' }] 
    } 
  },
  function (errreply) {
    //... 
    console.warn("ERr: "+JSON.stringify(err)+" Reply: "+JSON.stringify(reply));
  });

With multiple indexes:

  var indexes = [{ key: 'key1_bin', value: 'value1' }, { key: 'key2_bin', value: 'value2' }];
  var options = { bucket: 'test', key: 'test-put-index', content: { value: '{"test":"data"}', content_type: 'application/json', indexes: indexes }, return_body: true };
 
  client.put(options, function(errreply) {
    //... 
  });

With vector clock:

var options = { bucket: 'test', key: 'test-vclock', content: { value: '{"test":"data"}', content_type: 'application/json' }, return_body: true };
client.put(options, function (errreply) {
  if (err) throw err;
  var options = { bucket: 'test', key: 'test-vclock', content: { value: '{"test":"data"}', content_type: 'application/json' }, return_body: true };
  options.vclock = reply.vclock;
  client.put(options, function(reply) {
    // ... 
  });
});

Example:

client.get({ bucket: 'test', key: 'test' }, function (errreply) {
  t.equal(++cbCount, 1);
  t.notOk(err, err && err.message);
  t.ok(Array.isArray(reply.content));
  t.equal(reply.content.length, 1);
  t.equal(reply.content[0].value, '{"test":"data"}');
  t.end();
});

Example:

client.getIndex({
  bucket: 'test',
  index: 'test_bin',
  qtype: 0,
  key: 'test' },
  function (errreply) {
    //... 
  });

Range query example:

client.getIndex({
  bucket: 'test',
  index: 'test_bin',
  qtype: 1,
  range_min: 'abc',
  range_max: 'abcdef' },
  function (errreply) {
    //... 
  });

Example:

client.setBucket('test', { allow_mult: true, n_val: 3 },
  function (errreply) {
    /// ... 
  });

With callback:

client.getKeys('test', function (errkeys) {
  /// ... 
});

Streaming:

var s = client.getKeys('test');
 
s.on('readable', function() {
  var key;
  while(key = s.read()) {
    console.log('got key:', key);
  }
});

With callback:

client.search({ index: 'key1_bin', q: 'test' }, function (errreply) {
  /// ... 
});

With callback:

var request = {
  inputs: 'test',
  query: [
    {
      map: {
        source: 'function (v) { return [[v.bucket, v.key]]; }',
        language: 'javascript',
        keep: true
      }
    }]};
 
var params = { request: JSON.stringify(request), content_type: 'application/json' };
 
client.mapred(params, function (errresponses) {
  /// ... 
});

Streaming:

var request = {
  inputs: 'test',
  query: [
    {
      map: {
        source: 'function (v) { return [[v.bucket, v.key]]; }',
        language: 'javascript',
        keep: true
      }
    }]};
 
var params = { request: JSON.stringify(request), content_type: 'application/json' };
 
var s = client.mapred(params);
 
s.on('readable', function() {
  var res;
  while(res = s.read()) {
    console.log('got res:', res);
  }
});
client.del('test', key, function(err) {
  // ... 
});

or, with options:

client.del('test', {key: key, vclock: vclock}, function(err) {
  // ... 
});

Queues a disconnect after all the pending requests are complete

client.disconnect();
  • getBuckets(callback) // callback(err, buckets)
  • getBucket(bucket[, callback]) // callback(err, bucketInfo)
  • setBucket(bucket, props[, callback]) // callback(err)
  • setClientId (client_id[, callback]) // callback(err)
  • getClientId (callback) // callback(err, clientId)
  • ping (callback) // callback(err)
  • getServerInfo(callback) // callback(err, reply)

The client object emits these events:

  • 'error' - (err)
  • 'warning' - (warning) - Emitted when there is an internal error, like a disconnection. In this case, the client will transparently attempt to reconnect (up to a limit of attempts) and a "warning" will be emitted with the underlying error object.