@avanzu/eventstore

2.1.1 • Public • Published

Introduction

This is a heavily modified version of the original node-eventstore by Adriano Raiano.

The project goal is to provide an eventstore implementation for node.js:

  • load and store events via EventStream object
  • event dispatching to your publisher (optional)
  • supported Dbs (inmemory, mongodb, redis, tingodb, elasticsearch, azuretable, dynamodb)
  • snapshot support
  • query your events

Upgrade instructions

Installation

npm install @avanzu/eventstore

Usage

Require the module and init the eventstore:

var eventstore = require('@avanzu/eventstore')

var es = eventstore()

By default the eventstore will use an inmemory Storage.

Logging

For logging and debugging you can use debug by TJ Holowaychuk

simply run your process with

DEBUG=@avanzu/eventstore/* node app.js

Provide implementation for storage

example with mongodb:

var es = require('@avanzu/eventstore')({
    type: 'mongodb',
    host: 'localhost', // optional
    port: 27017, // optional
    dbName: 'eventstore', // optional
    eventsCollectionName: 'events', // optional
    snapshotsCollectionName: 'snapshots', // optional
    transactionsCollectionName: 'transactions', // optional
    timeout: 10000, // optional
    // emitStoreEvents: true                       // optional, by default no store events are emitted
    // maxSnapshotsCount: 3                        // optional, defaultly will keep all snapshots
    // authSource: 'authedicationDatabase'         // optional
    // username: 'technicalDbUser'                 // optional
    // password: 'secret'                          // optional
    // url: 'mongodb://user:pass@host:port/db?opts // optional
    // positionsCollectionName: 'positions'        // optional, defaultly wont keep position
})

catch connect and disconnect events

es.on('connect', function () {
    console.log('storage connected')
})

es.on('disconnect', function () {
    console.log('connection to storage is gone')
})

define event mappings [optional]

Define which values should be mapped/copied to the payload event.

es.defineEventMappings({
    id: 'id',
    commitId: 'commitId',
    commitSequence: 'commitSequence',
    commitStamp: 'commitStamp',
    streamRevision: 'streamRevision',
})

initialize

await es.init()

working with the eventstore

get the eventhistory (of an aggregate)

const { events } = await es.getEventStream({ query: 'streamId' })

or

const { events } = await es.getEventStream({ 
    query: {
        aggregateId: 'myAggregateId',
        aggregate: 'person', // optional
        context: 'hr', // optional
    }
})

'streamId' and 'aggregateId' are the same... In ddd terms aggregate and context are just to be more precise in language. For example you can have a 'person' aggregate in the context 'human ressources' and a 'person' aggregate in the context of 'business contracts'... So you can have 2 complete different aggregate instances of 2 complete different aggregates (but perhaps with same name) in 2 complete different contexts

you can request an eventstream even by limit the query with a 'minimum revision number' and a 'maximum revision number'

const {events} = await es.getEventStream({
    query: 'streamId' || {/* query */ },
    revMin: 5,
    revMax: 8
})

store a new event and commit it to store

const stream = await es.getEventStream({ query: 'streamId' })

stream.addEvent({ my: 'event' })
stream.addEvents([{ my: 'event2' }])

await stream.commit()
console.log(stream.eventsToDispatch)

if you defined an event publisher function the committed event will be dispatched to the provided publisher

if you just want to load the last event as stream you can call getLastEventAsStream instead of ´getEventStream´.

working with snapshotting

get snapshot and eventhistory from the snapshot point

const [snapshot, stream] = await es.getFromSnapshot({query: 'streamId'})

or

const [snapshot, stream] = await es.getFromSnapshot({
        query: {
            aggregateId: 'myAggregateId',
            aggregate: 'person', // optional
            context: 'hr', // optional
        }
    })

you can request a snapshot and an eventstream even by limit the query with a 'maximum revision number'

const [snapshot, stream] = es.getFromSnapshot({
    query: 'streamId' || { /* query */ },
    revMax: 8 // if you omit revMax or you define it as -1 it will retrieve until the end
})

create a snapshot point

const [snapshot, stream] = await es.getFromSnapshot('streamId')
const snap = snapshot.data
const history = stream.events

// create a new snapshot depending on your rules
if (history.length > myLimit) {
await es.createSnapshot({
    streamId: 'streamId',
    data: myAggregate.getSnap(),
    revision: stream.lastRevision,
    version: 1 // optional
});

// or

await es.createSnapshot({
    aggregateId: 'myAggregateId',
    aggregate: 'person',          // optional
    context: 'hr'                 // optional
    data: myAggregate.getSnap(),
    revision: stream.lastRevision,
    version: 1 // optional
});
}

// go on: store new event and commit it
// stream.addEvents...

You can automatically clean older snapshots by configuring the number of snapshots to keep with maxSnapshotsCount in eventstore options.

own event dispatching (no event publisher function defined)

const evts = await es.getUndispatchedEvents()

Deleting aggregates

currently supported by:

  1. mongodb

You can delete an aggregate including the event history, snapshots and transactions by calling deleteStream.

const deletedStream = await es.deleteStream('myStreamId')

The return value is the EventStream that has just been deleted.

This stream will contain an undispatched TombstoneEvent ready to be processed. The payload attribute of that event contains the complete event history.

const [tombstoneEvent] = deletedStream.eventsToDispatch 

query your events

for replaying your events or for rebuilding a viewmodel or just for fun...

skip, limit always optional

var skip = 0,
    limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end

const events = await es.getEvents({skip, limit})

// or

const events = await es.getEvents({query: 'streamId', skip, limit})

// or

const events = await es.getEvents({
    query: {
        // free choice (all, only context, only aggregate, only aggregateId...)
        context: 'hr',
        aggregate: 'person',
        aggregateId: 'uuid',
    },
    skip,
    limit
})

by revision

revMin, revMax always optional

const events = await es.getEventsByRevision({ 
    query: 'streamId', 
    revMin: 5, 
    revMax: 8  // if you omit revMax or you define it as -1 it will retrieve until the end
})
// or

const events = await es.getEventsByRevision({
    query: {
        aggregateId: 'myAggregateId',
        aggregate: 'person', // optional
        context: 'hr', // optional
    },
    revMin: 5, 
    revMax: 8  // if you omit revMax or you define it as -1 it will retrieve until the end
})

by commitStamp

skip, limit always optional

const events = await es.getEventsSince({
    commitStamp: new Date(2015, 5, 23), 
    skip: 10,
    limit: 100 // if you omit limit or you define it as -1 it will retrieve until the end
})

// or

const events = await es.getEventsSince({
    commitStamp: new Date(2015, 5, 23), 
    limit: 50
})

// or

const events = await es.getEventsSince({
    commitStamp: new Date(2015, 5, 23) 
})

streaming your events

Some databases support streaming your events, the api is similar to the query one

skip, limit always optional

var skip = 0,
    limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end

var stream = es.streamEvents({skip, limit})
// or
var stream = es.streamEvents({query: 'streamId', skip, limit})
// or by commitstamp
var stream = es.streamEventsSince({commitStamp: new Date(2015, 5, 23), skip, limit })
// or by revision
var stream = es.streamEventsByRevision({
    query: {
        aggregateId: 'myAggregateId',
        aggregate: 'person',
        context: 'hr',
    }
})

stream.on('data', function (e) {
    doSomethingWithEvent(e)
})

stream.on('end', function () {
    console.log('no more evets')
})

// or even better
stream.pipe(myWritableStream)

currently supported by:

  1. mongodb (driver version <= 4.0.0)

get the last event

for example to obtain the last revision nr

const event = await es.getLastEvent('streamId')

// or

const event = await es.getLastEvent({ // free choice (all, only context, only aggregate, only aggregateId...)
  context: 'hr',
  aggregate: 'person',
  aggregateId: 'uuid'
});

obtain a new id

const id = await es.getNewId()

position of event in store

some db implementations support writing the position of the event in the whole store additional to the streamRevision.

currently those implementations support this:

  1. inmemory ( by setting ``trackPosition` option )
  2. mongodb ( by setting positionsCollectionName option)

special scaling handling with mongodb

Inserting multiple events (documents) in mongodb, is not atomic. For the eventstore tries to repair itself when calling getEventsByRevision. But if you want you can trigger this from outside:

const [firstTransaction] = await es.store.getPendingTransactions()

const lastEvent = await es.store.getLastEvent({
        aggregateId: firstTransaction.aggregateId,
        aggregate: firstTransaction.aggregate, // optional
        context: firstTransaction.context, // optional
})

await es.store.repairFailedTransaction(lastEvent)    

Upgrade instructions

From 1.x.x to 2.x.x

Starting from version 2.0.0 the eventstore does not longer support multiple positional arguments. Instead, you have to pass in a params object. The general idea, that you only have to specify the arguments that deviate from the defaults remains.

Please refer to the following table to see how the signatures have changed

1.x.x 2.x.x
streamEvents(query, skip, limit) streamEvents({query, skip, limit})
streamEventsSince(commitStamp, skip, limit) streamEvents({commitStamp, skip, limit})
streamEventsSince(commitStamp, skip, limit) streamEventsSince({commitStamp, skip, limit})
streamEventsByRevision(query, revMin, revMax) streamEventsByRevision({query, revMin, revMax})
getEvents(query, skip, limit) getEvents({query, skip, limit})
getEventsSince(commitStamp, skip, limit) getEventsSince({commitStamp, skip, limit})
getEventsByRevision(query, revMin, revMax) getEventsByRevision({query, revMin, revMax})
getEventStream(query, revMin, revMax) getEventStream({query, revMin, revMax})
getFromSnapshot(query, revMax) getFromSnapshot({query, revMax})

Inspiration

Database Support

Currently these databases are supported:

  1. inmemory
  2. mongodb (node-mongodb-native)
  3. redis (redis)
  4. tingodb (tingodb)
  5. azuretable (azure-storage)
  6. dynamodb (aws-sdk)

own db implementation

You can use your own db implementation by extending this...

var Store = require('@avanzu/eventstore').Store,
    util = require('util'),
    _ = require('lodash')


class MyDB extends Store {
    constructor(options) {
        super(options)
    }
}

module.exports = MyDB

and you can use it in this way

var es = require('@avanzu/eventstore')({
    type: MyDB,
})
// es.init...

Package Sidebar

Install

npm i @avanzu/eventstore

Weekly Downloads

3

Version

2.1.1

License

none

Unpacked Size

670 kB

Total Files

40

Last publish

Collaborators

  • avanzu