Streamr JavaScript Client
By using this client, you can easily interact with the Streamr API from JavaScript-based environments, such as browsers and node.js. You can, for example, subscribe to real-time data in streams, produce new data to streams, and create new streams.
The client uses websockets for producing and consuming messages to/from streams. It should work in all modern browsers.
Breaking changes notice
- Dec 31st 2020: Email/password login will be removed. Connect your email account to an Ethereum address before the deadline!
- Dec 31st 2020: Stream storage becomes opt-in and freely choosable.
- Dec 31st 2020: Support for API keys will end.
- Date TBD: Support for unsigned data will be dropped.
Installation · Usage · Client options · Authentication options · Message handler callback · StreamrClient object · Stream object · Subscription options · Data Unions · Utility functions · Events · Partitioning · Logging · NPM Publishing
Installation
The client is available on npm and can be installed simply by:
npm install streamr-client
Usage
Here are some quick examples. More detailed examples for the browser and node.js can be found here.
If you don't have an Ethereum account you can use the utility function StreamrClient.generateEthereumAccount()
, which returns the address and private key of a fresh Ethereum account.
Creating a StreamrClient instance
const client = new StreamrClient({
auth: {
privateKey: 'your-private-key'
}
})
When using Node.js remember to require the library with:
const StreamrClient = require('streamr-client')
Subscribing to real-time events in a stream
const sub = client.subscribe(
{
stream: 'streamId',
partition: 0, // Optional, defaults to zero. Use for partitioned streams to select partition.
// optional resend options here
},
(message, metadata) => {
// This is the message handler which gets called for every incoming message in the stream.
// Do something with the message here!
}
)
Resending historical data
const sub = await client.resend(
{
stream: 'streamId',
resend: {
last: 5,
},
},
(message) => {
// This is the message handler which gets called for every received message in the stream.
// Do something with the message here!
}
)
See "Subscription options" for resend options
Programmatically creating a stream
client.getOrCreateStream({
name: 'My awesome stream created via the API',
})
.then((stream) => {
console.log(`Stream ${stream.id} has been created!`)
// Do something with the stream, for example call stream.publish(message)
})
Publishing data points to a stream
// Here's our example data point
const msg = {
temperature: 25.4,
humidity: 10,
happy: true
}
// Publish using the stream id only
client.publish('my-stream-id', msg)
// The first argument can also be the stream object
client.publish(stream, msg)
// Publish with a specific timestamp as a Date object (default is now)
client.publish('my-stream-id', msg, new Date(54365472))
// Publish with a specific timestamp in ms
client.publish('my-stream-id', msg, 54365472)
// Publish with a specific timestamp as a ISO8601 string
client.publish('my-stream-id', msg, '2019-01-01T00:00:00.123Z')
// Publish with a specific partition key (read more about partitioning further down this readme)
client.publish('my-stream-id', msg, Date.now(), 'my-partition-key')
// For convenience, stream.publish(...) equals client.publish(stream, ...)
stream.publish(msg)
Client options
Option | Default value | Description |
---|---|---|
url | wss://streamr.network/api/v1/ws | Address of the Streamr websocket endpoint to connect to. |
restUrl | https://streamr.network/api/v1 | Base URL of the Streamr REST API. |
auth | {} | Object that can contain different information to authenticate. More details below. |
publishWithSignature | 'auto' | Determines if data points published to streams are signed or not. Possible values are: 'auto', 'always' and 'never'. Signing requires auth.privateKey or auth.provider . 'auto' will sign only if one of them is set. 'always' will throw an exception if none of them is set. |
verifySignatures | 'auto' | Determines under which conditions signed and unsigned data points are accepted or rejected. 'always' accepts only signed and verified data points. 'never' accepts all data points. 'auto' verifies all signed data points before accepting them and accepts unsigned data points only for streams not supposed to contain signed data. |
autoConnect | true | If set to true , the client connects automatically on the first call to subscribe() . Otherwise an explicit call to connect() is required. |
autoDisconnect | true | If set to true , the client automatically disconnects when the last stream is unsubscribed. Otherwise the connection is left open and can be disconnected explicitly by calling disconnect() . |
orderMessages | true | If set to true , the subscriber handles messages in the correct order, requests missing messages and drops duplicates. Otherwise, the subscriber processes messages as they arrive without any check. |
maxPublishQueueSize | 10000 | Only in effect when autoConnect = true . Controls the maximum number of messages to retain in internal queue when client has disconnected and is reconnecting to Streamr. |
publisherGroupKeys | {} | Object defining the group key as a hex string used to encrypt for each stream id. |
publisherStoreKeyHistory | true | If true , the client will locally store every key used to encrypt messages at some point. If set to false , the client will not be able to answer subscribers asking for historical keys during resend requests. |
subscriberGroupKeys | {} | Object defining, for each stream id, an object containing the group key used to decrypt for each publisher id. Not needed if keyExchange is defined. |
keyExchange | {} | Defines RSA key pair to use for group key exchange. Can define publicKey and privateKey fields as strings in the PEM format, or stay empty to generate a key pair automatically. Can be set to null if no key exchange is required. |
Authentication options
Authenticating with an API key has been deprecated.
Support for email/password authentication will be dropped in the future and cryptographic keys/wallets will be the only supported method.
If you don't have an Ethereum account you can use the utility function StreamrClient.generateEthereumAccount()
, which returns the address and private key of a fresh Ethereum account.
Authenticating with Ethereum also automatically creates an associated Streamr user, even if it doesn't already exist. Under the hood, the client will cryptographically sign a challenge to authenticate you as a Streamr user:
new StreamrClient({
auth: {
privateKey: 'your-private-key'
}
})
Authenticating with an Ethereum private key contained in an Ethereum (web3) provider:
new StreamrClient({
auth: {
provider: window.ethereum,
}
})
(Authenticating with a pre-existing session token, for internal use by the Streamr app):
new StreamrClient({
auth: {
sessionToken: 'session-token'
}
})
Message handler callback
The second argument to client.subscribe(options, callback)
is the callback function that will be called for each message as they arrive. Its arguments are as follows:
Argument | Description |
---|---|
payload | A JS object containing the message payload itself |
streamMessage | The whole StreamMessage object containing various metadata, for example streamMessage.getTimestamp() etc. |
StreamrClient object
Connecting
Name | Description |
---|---|
connect() | Connects to the server, and also subscribes to any streams for which subscribe() has been called before calling connect() . Returns a Promise. Rejects if already connected or connecting. |
disconnect() | Disconnects from the server, clearing all subscriptions. Returns a Promise. Rejects if already disconnected or disconnecting. |
pause() | Disconnects from the server without clearing subscriptions. |
ensureConnected() | Safely connects if not connected. Returns a promise. Resolves immediately if already connected. Only rejects if an error occurs during connection. |
ensureDisconnected() | Safely disconnects if not disconnected. Returns a promise. Resolves immediately if already disconnected. Only rejects if an error occurs during disconnection. |
Managing subscriptions
Name | Description |
---|---|
subscribe(options, callback) | Subscribes to a stream. Messages in this stream are passed to the callback function. See below for subscription options. Returns a Subscription object. |
unsubscribe(Subscription) | Unsubscribes the given Subscription . |
unsubscribeAll(streamId ) |
Unsubscribes all Subscriptions for streamId . |
getSubscriptions(streamId ) |
Returns a list of Subscriptions for streamId . |
Stream API
All the below functions return a Promise which gets resolved with the result.
Name | Description |
---|---|
getStream(streamId) | Fetches a stream object from the API. |
listStreams(query) | Fetches an array of stream objects from the API. For the query params, consult the API docs. |
getStreamByName(name) | Fetches a stream which exactly matches the given name. |
createStream(properties) | Creates a stream with the given properties. For more information on the stream properties, consult the API docs. |
getOrCreateStream(properties) | Gets a stream with the id or name given in properties , or creates it if one is not found. |
publish(streamId, message, timestamp, partitionKey) | Publishes a new message to the given stream. |
Listening to state changes of the client
on(eventName, function) | Binds a function
to an event called eventName
once(eventName, function) | Binds a function
to an event called eventName
. It gets called once and then removed.
removeListener(eventName, function) | Unbinds the function
from events called eventName
Stream object
All the below functions return a Promise which gets resolved with the result.
Name | Description |
---|---|
update() | Updates the properties of this stream object by sending them to the API. |
delete() | Deletes this stream. |
getPermissions() | Returns the list of permissions for this stream. |
hasPermission(operation, user) | Returns a permission object, or null if no such permission was found. Valid operation values for streams are: stream_get, stream_edit, stream_delete, stream_publish, stream_subscribe, and stream_share. user is the username of a user, or null for public permissions. |
grantPermission(operation, user) | Grants the permission to do operation to user , which are defined as above. |
revokePermission(permissionId) | Revokes a permission identified by its id . |
detectFields() | Updates the stream field config (schema) to match the latest data point in the stream. |
publish(message, timestamp, partitionKey) | Publishes a new message to this stream. |
Subscription options
Note that only one of the resend options can be used for a particular subscription. The default functionality is to resend nothing, only subscribe to messages from the subscription moment onwards.
Name | Description |
---|---|
stream | Stream id to subscribe to |
partition | Partition number to subscribe to. Defaults to partition 0. |
resend | Object defining the resend options. Below are examples of its contents. |
groupKeys | Object defining the group key as a hex string for each publisher id of the stream. |
// Resend N most recent messages
resend: {
last: 10,
}
// Resend from a specific message reference up to the newest message
resend: {
from: {
timestamp: 12345,
sequenceNumber: 0, // optional
}
publisher: 'publisherId', // optional
msgChainId: 'msgChainId', // optional
}
// Resend a limited range of messages
resend: {
from: {
timestamp: 12345,
sequenceNumber: 0, // optional
},
to: {
timestamp: 54321,
sequenceNumber: 0, // optional
},
publisher: 'publisherId', // optional
msgChainId: 'msgChainId', // optional
}
If you choose one of the above resend options when subscribing, you can listen on the completion of this resend by doing the following:
const sub = client.subscribe(...)
sub.on('initial_resend_done', () => {
console.log('All caught up and received all requested historical messages! Now switching to real time!')
})
Data Unions
This library provides functions for working with Data Unions. All of the below methods return a Promise.
Admin functions
Name | Returns | Description |
---|---|---|
deployDataUnion() | Transaction | Deploy a new Data Union |
createSecret(dataUnionContractAddress, secret[, name]) | Create a secret for a Data Union | |
dataUnionIsReady(address) | Wait until a new Data Union is initialized by its Operator | |
addMembers(dataUnionContractAddress, memberAddressList) | Add members | |
kick(dataUnionContractAddress, memberAddressList) | Kick members out from Data Union |
const dataUnion = await client.deployDataUnion()
dataUnion.address // already available before deployment
await dataUnion.deployed() // waits until contract is deployed
await dataUnion.isReady() // waits until data union is operated
Member functions
Name | Returns | Description |
---|---|---|
joinDataUnion(dataUnionContractAddress[, secret]) | JoinRequest | Join a Data Union |
hasJoined(dataUnionContractAddress[, memberAddress]) | Wait until member has been accepted | |
validateProof(dataUnionContractAddress, options) | true/false | Check that server is giving a proof that allows withdrawing |
withdraw(dataUnionContractAddress, options) | Receipt | Withdraw funds from Data Union |
withdrawFor(memberAddress, dataUnionContractAddress, options) | Receipt | Pay for withdraw transaction on behalf of a Data Union member |
withdrawTo(recipientAddress, dataUnionContractAddress, options) | Receipt | Donate/move your earnings to recipientAddress instead of your memberAddress |
The options object for withdraw functions above may contain following overrides:
Property | Default | Description |
---|---|---|
wallet | auth | ethers.js Wallet object to use to sign and send withdraw transaction |
provider | mainnet | ethers.js Provider to use if wallet wasn't provided |
confirmations | 1 | Number of blocks to wait after the withdraw transaction is mined |
gasPrice | ethers.js | Probably uses the network estimate |
Query functions
These are available for everyone and anyone, to query publicly available info from a Data Union:
Name | Returns | Description |
---|---|---|
getMemberStats(dataUnionContractAddress[, memberAddress]) | {earnings, proof, ...} | Get member's stats |
getDataUnionStats(dataUnionContractAddress) | {memberCount, totalEarnings, ...} | Get Data Union's statistics |
getMembers(dataUnionContractAddress) | [{address, earnings}, ...] | Get Data Union's members |
Utility functions
Name | Description |
---|---|
StreamrClient.generateEthereumAccount() | Generates a random Ethereum private key and returns an object with fields address and privateKey. Note that this private key can be used to authenticate to the Streamr API by passing it in the authentication options, as described earlier in this document. |
Events
Binding to events
The client and the subscriptions can fire events as detailed below. You can bind to them using on
:
// The StreamrClient emits various events
client.on('connected', () => {
console.log('Yeah, we are connected now!')
})
// So does the Subscription object
const sub = client.subscribe(...)
sub.on('subscribed', () => {
console.log(`Subscribed to ${sub.streamId}`)
})
Events on the StreamrClient instance
Name | Handler Arguments | Description |
---|---|---|
connected | Fired when the client has connected (or reconnected). | |
disconnected | Fired when the client has disconnected (or paused). |
Events on the Subscription object
Name | Handler Arguments | Description |
---|---|---|
subscribed | Fired when a subscription request is acknowledged by the server. | |
unsubscribed | Fired when an unsubscription is acknowledged by the server. | |
resending | ResendResponseResending | Fired when the subscription starts resending. Followed by the resent event to mark completion of the resend after resent messages have been processed by the message handler function. |
resent | ResendResponseResent | Fired after resending when the subscription has finished resending. |
no_resend | ResendResponseNoResend | This will occur instead of the resending - resent sequence in case there were no messages to resend. |
error | Error object | Reports errors, for example problems with message content |
Partitioning
Partitioning (sharding) enables streams to scale horizontally. This section describes how to use partitioned streams via this library. To learn the basics of partitioning, see the docs.
Creating partitioned streams
By default, streams only have 1 partition when they are created. The partition count can be set to any positive number (1-100 is reasonable). An example of creating a partitioned stream using the JS client:
client.createStream({
name: 'My partitioned stream',
partitions: 10,
}).then(stream => {
console.log(`Stream created: ${stream.id}. It has ${stream.partitions} partitions.`)
})
Publishing to partitioned streams
In most use cases, a user wants related events (e.g. events from a particular device) to be assigned to the same partition, so that the events retain a deterministic order and reach the same subscriber(s) to allow them to compute stateful aggregates correctly.
The library allows the user to choose a partition key, which simplifies publishing to partitioned streams by not requiring the user to assign a partition number explicitly. The same partition key always maps to the same partition. In an IoT use case, the device id can be used as partition key; in user interaction data it could be the user id, and so on.
The partition key can be given as an argument to the publish
methods, and the library assigns a deterministic partition number automatically:
client.publish('my-stream-id', msg, Date.now(), msg.vehicleId)
// or, equivalently
stream.publish(msg, Date.now(), msg.vehicleId)
Subscribing to partitioned streams
By default, the JS client subscribes to the first partition (partition 0
) in a stream. The partition number can be explicitly given in the subscribe call:
client.subscribe(
{
stream: 'my-stream-id',
partition: 4, // defaults to 0
},
(payload) => {
console.log(`Got message ${JSON.stringify(payload)}`)
},
)
Or, to subscribe to multiple partitions, if the subscriber can handle the volume:
const handler = (payload, streamMessage) => {
console.log(`Got message ${JSON.stringify(payload)} from partition ${streamMessage.getStreamPartition()}`)
}
[2,3,4].forEach(partition => {
client.subscribe(
{
stream: 'my-stream-id',
partition: partition,
},
handler,
)
})
Logging
The Streamr JS client library supports debug for logging.
In node.js, start your app like this: DEBUG=StreamrClient* node your-app.js
In the browser, include debug.js
and set localStorage.debug = 'StreamrClient'
Publishing
Publishing to NPM is automated via Github Actions. Follow the steps below to publish latest
or beta
.
latest
:
Publishing - Update version with either
npm version [patch|minor|major]
. Use semantic versioning https://semver.org/. Files package.json and package-lock.json will be automatically updated, and an appropriate git commit and tag created. git push --follow-tags
- Wait for Github Actions to run tests
- If tests passed, Github Actions will publish the new version to NPM
beta
:
Publishing - Update version with either
npm version [prepatch|preminor|premajor] --preid=beta
. Use semantic versioning https://semver.org/. Files package.json and package-lock.json will be automatically updated, and an appropriate git commit and tag created. git push --follow-tags
- Wait for Github Actions to run tests
- If tests passed, Github Actions will publish the new version to NPM