Overview
Basic riak client that is fully streaming
Table of Contents
Installation
npm install -S riaks
Usage
The client supports both the riak http
interface as well as protocol buffers
. Specify the interface you wish to use via the protocol
parameter in the configuration option. The api of the resulting client object is the same regardless of which protocol is used.
- http Client
var Client = var opts = host: 'localhost' protocol: 'http' port: 8098 timeout: 10000 // optional var client = opts
- protocol buffer client. Use the
protocol: 'protobuf'
setting.
var Client = var opts = host: 'localhost' protocol: 'protobuf' port: 8087 timeout: 10000 // optional var client = opts
- https Client. If you want all traffic to be encrypted over https, use the
protocol: 'https'
setting
var Client = var opts = host: 'localhost' protocol: 'https' port: 443 timeout: 10000 // optional var client = opts
Aftter the client is created, you can verify the connection to the riak server is valid by calling the connect
method on the client. This returns a promise which is resolved after the client succesfully communicates with the riak server and gets a response back.
var promise = clientpromise { console} { console console throw err}
API
Once you have a client object, the following api is available
bucketKeys
Get all keys from a bucket (returns a promise). According to Riak this should not be used in production since it is very slow
var opts = bucket: 'test_bucket_name' // name of the bucket to look invar promise = clientpromise
bucketKeysStream
Get all the keys in a bucket, but stream them back as they come back from Riak
var opts = bucket: 'test_bucket_name' // name of the bucket to look invar keyStream = clientkeyStream
bucketStream
Get a list of all buckets and stream back the bucket names as they come back from Riak
var bucketStream = clientbucketStream
bucketDeleteAll
delete all keys in a bucket (returns a promise)
var opts = bucket: 'test_bucket' // name of bucket concurrency: 5 // optional, limits max number of simultanous delete requests to riakvar promise = clientpromise
getWithKey
Get value for key (returns a promise).
If the object was saved with content-type: application/json
, the client will call JSON.parse and return an actual javascript object.
var opts = bucket: 'test_bucket' key: 'test_key'var promise = clientpromise
If you need both the value as well as the secondary index key value pairs, specify the returnMeta: true
parameter.
var opts = bucket: 'test_bucket' key: 'test_key' returnMeta: true // optional, return 2i indexes, headers, etcvar promise = clientpromise
In the example above, the reply from getWithKey will look like
value: foo: 'bar' // actual javascript object but stored as json in riak indices: key: 'first_index_bin' value: 'first index value here' key: 'second_index_bin' value: 'second index value here'
saveWithKey
save value for key with optionaly secondary indices(returns a promise). In the following example, we set two different secondary indices test_index_one
with value 45
and test_index_two
with value foo
- String value
var opts = bucket: 'test_bucket' key: 'test_key' indices: test_index_one: '45' test_index_two: 'foo' returnBody: true // (optional) whether to return the contents of the stored object. defaults to false value: 'test_value_here'var promise = clientpromise
- Object value.
JSON.stringify
will be called on the value before it is saved to riak and the headercontent-type: application/json
header will be applied. When you get this value back from riak via theclient.getWithKey
method,JSON.parse
will be called so you receive an actual object back.
var opts = bucket: 'test_bucket' key: 'test_key' indices: test_index_one: '45' test_index_two: 'foo' returnBody: true // (optional) whether to return the contents of the stored object. defaults to false value: foo: 'bar' var promise = clientpromise
deleteWithKey
delete a key in a given bucket, returning a promise
var opts = bucket: 'test_bucket' key: 'test_key'var promise = clientpromise
search
Search via the solr-compatible interface.
var opts = index: bucket q: 'value_*' // query df: 'bar' // default field start: 0 rows: 1 // limit number of results sort: 'bar' filter: '' presort: 'key'var promise = clientpromise
The reply object in the example above will look like
{
numFound: 4,
start: 0,
maxScore: '0.00000e+0',
docs: [
{
id: '1_key',
index: 'http_test',
fields: { bar: 'value_1' },
props: {}
}
]
}
searchStream
Search via the solr-compatible interface and automatically search over paginated results via streaming interface
var opts = index: bucket q: 'value_*' // query df: 'bar' // default field start: 0 rows: 1 // limit number of results per page, (optional, defaults to 20) maxRows: 20 // end the stream when reached, if not set client will stream all matching results sort: 'bar' filter: '' presort: 'key'var readStream = clientreadStreamreadStream
The reply object in the dataHandler
function in the example above will look like
{
id: '1_key',
index: 'http_test',
fields: { bar: 'value_1' },
props: {}
}
queryRangeStream
Stream back keys from a secondary index query. The appriate suffix of either _int
or _bin
will appended to the index key based on the type of value in the start
field. This maps the to http://docs.basho.com/riak/latest/dev/references/http/secondary-indexes/ http interface in Riak.
If you want the secondary index values in the output, specify returnTerms: true
in the options object. Note that the returnTerms
corresponds to return_terms
in the riak http options.
If you want limit the number of results returned, specify maxResults: <integer value
. The maxResults
options maps the riak max_results
url parameter.
- Keys only
var opts = bucket: 'test_bucket' indexKey: 'test_index_key' // `_bin` or `_int` suffix automatically added based on start value type start: '/x00' end: '/xff' returnTerms: false // false by default if not specified maxResults: 10 // optional, limits the number of results returnedvar keyStream = clientkeyStream
- Keys And Index values
var inspect = var opts = bucket: 'test_bucket' indexKey: 'test_index_key' // `_bin` or `_int` suffix automatically added based on start value type start: '/x00' end: '/xff' returnTerms: true // false by default if not specified maxResults: 10 // optional, limits the number of results returnedvar keyStream = clientkeyStream
The data objects emitted by the stream in the above example look like
key: 'secondary index value' value: 'key to actual object stored in riak'
mapReduceStream
Run mapreduce jobs with arbitrary javascript functions and stream back the results. In the example below, the actual javascript functions for the map and reduce phase as passed in as paramters. The client will handle stringifying these functions before sending the mapred
request to riak.
var opts = var readStream = clientreadStreamreadStream readStream { var mapPhaseOpts = map: fn: mapFunction keep: false arg: 'foo' var reducePhaseOpts = reduce: fn: reduceFunction keep: true arg: 'foo' var opts = {} var query = mapPhaseOpts reducePhaseOpts // use a secondary index query as an input to the map reduce job var inputs = bucket: 'test_bucket' index: 'test_index_key_bin' start: 'test_start' end: 'test_end' optsquery = query optsinputs = inputs optstimeout = 1000 // optional, set to 1000 milliseconds timeout or 1 second return opts} { var data = Riak0 var output = data return output} { // riak runs reduce jobs in batchs of 20. If this job is the result of a previous reduce, return it directly. // see http://stackoverflow.com/questions/16359656/riak-map-reduce-in-js-returning-limited-data/20058732 if typeof list === 'number' return list var sum = list return sum}
purgeDB
Completely clear out all buckets in Riak. (returns a promise)
var promise = clientpromise
disconnect
Close existing open connects to riak server. This is really only needed for the protobuf
protocol option, and is a no-op
for the http
protocol option to maintain a consistent public api.
client
Test
Make sure you have riak running on the default port before running the test suite.
npm installmake test # or npm test
To make testing easier, a Vagrantfile
is included in the root of this repo. Bringing up this virtual machine will install the required dependencies so that you can test the riaks client without having to install nodejs and riak on your local machine.
vagrant up --provisionvagrant sshcd /vagrantnpm installnpm test