subscribe-me
Events subscriptions CRUD and dispatcher. In an event driven microservices architecture it helps to keep generic events but specific subscriptions.
Install
npm:
npm i subscribe-me
Yarn:
yarn add subscribe-me
Usage
;; // memory storage is for testing/experiments purposeconst storage = ;const subscriber = ; // you can subscribe to an eventconst id = subscriber; // you can use bufferMilliseconds to setup an event time buffer// this is usful to reduce the amount of subscriptions storage queries// it works grouping the events by event type and performing only one query per event typeconst notifier = ; // getNotification method accept a rxjs stream of events and return a rxjs stream of notificationsconst notifications = notifier; // dispatch notification to console log// this subscribe is the rxjs Observable subscribe methodnotifications; // you can unsubscribe from the event using the subscription idsubscriber;
Events Time Buffer
As you see in the above example, it is possible to setup an event time buffer using the notifier bufferMilliseconds
setting. Lets say we have the subscriptions stored in a postgres database and we have an income events stream of 1000 events in 10 seconds, if we have the bufferMilliseconds
set to 10000 (10 s) and those 1000 events comes in 3 different event types, the notifier gonna perform 3 queries to the postgres database (one per each event type) in those 10 seconds. If we have the bufferMilliseconds
set to 5000 (5 s), and those 1000 events are evenly distributed in those 10 seconds, we have more or less 500 events (of those 3 types) in each 5 seconds buffer, so the notifier will perform 3 queries in 5 seconds (6 each 10 seconds). So having a longer time buffer reduce the amount of storage accesses. The downside is that the time buffer introduce latency between the incoming events and outcoming notifications.
PostgreSQL Storage
It is possible to create a postgres storage providing the postgres connection configuration. Besides postgres specific confuguration setting you can provide the subscription table name and the cbhunk size. The database is accesed via a cursor that get the data by chunks, if the chunk size is 1000 then each chunk will contain 1000 table rows.
; const storage =
Also you can provide directly the connection pool, so for instance, you can allow postgres get its configuration from environment variables.
;; const storage =
API
Table of Contents
createMemoryStorage
Creates a memory storage, it is used just for tesnting and experiments.
Returns Object the storage to be used to create a notifier or a subscriber.
createPostgresStorage
Creates PostgreSQL storage.
Parameters
options
Object Postgres configurationoptions.user
string db useroptions.password
string db user passwordoptions.host
string db hostoptions.port
number db hostoptions.database
string db nameoptions.pool
Object or you can provide just a connection pool instance instead of the previous settingsoptions.chunkSize
number the chunk size (optional, default1000
)options.table
string subscription table name (optional, default"event_subscriptions"
)
Returns Object The storage to be used to create a notifier or a subscriber.
createSubscriber
Create the suubscriber.
Parameters
Returns any The subscriber with the subscribe
and unsubscribe
methods.
createNotifier
Creates the notifier.
Parameters
options
Object configuration options
Returns any The notifier with the getNotification
method, it receive the input event stream and return the notification event stream (rxjs observable)
License
MIT © Agustin Lascialandare