mongo-river-elastic
https://www.npmjs.com/package/mongo-river-elastic The mongo-river-elastic project provides a nodejs wrapper over mongodb nodejs , which synchronize data from your mongodb to elasticsearch. It is tested and works best with following:
- Mongodb v4.0+
- Mongodb Nodejs Driver v3.2
- elasticsearch v6.0+ , v7.0+
Following Mongodb functions are supported:
Getting Started
Before you use the library, following points are crucial
Collections
- It will not auto-create elasticsearch index, all the indexes has to be created by you.
- All objects in a collection is appended with an extra key called
river
during insert/update and it is indexed. This key stores timestamp and is used for syncing with elasticsearch - To ensure consistency when elasticsearch is unreachable/down/busy, a mongodb collection
river_backlog
is created, which contains a single document having meta-information about objects yet to be synced. It will be deleted once elasticsearch is reachable and sync is complete.
Primary Key
- Elasticsearch expects a primary key for each insert. You have to specify your
primaryKeyField
in mongodb document which will be used by the library. - If you dont have any primary key field, use mongodb bson id
_id
- Primary key in elasticsearch is always
string
.
Elasticsearch v7.0
- This elasticsearch version has
_type
field depricated. - If your elasticsearch is v7.0+, library will ignore the type value provided in
_collection_index_dict
Prerequisites
Dependent packages
npm install mongodb
npm install elasticsearch
Installing
npm install mongo-river-elastic
npm install mongo-river-elastic --save
Initialize
const River = require('mongo-river-elastic')
let river = new River(_mongo_db_ref, _es_ref, _collection_index_dict, options)
Parameters:
-
_mongo_db_ref -- Object
- mongodb database connection reference object
-
_es_ref -- Object
- elasticsearch connection reference object
-
_collection_index_dict -- Object
- Object dictionary that contains mapping between mongodb collections and elasticsearch index/type
- Example
where,
_collection_index_dict = { 'collection1': {index: 'index1', type: 'type1', primaryKeyField: 'primaryKeyField1'}, 'collection2': {index: 'index2', type: 'type2', primaryKeyField: 'primaryKeyField2'}}
-
collection1
: mongodb collection name -
index1
: elasticsearch index name -
type
: elasticsearch index mapping name. Set tonull
if not applicable. -
primaryKeyField1
: mongodb collection objectkey
to be used as primary key in elasticsearch
-
-
options -- Object
- loglevel String
- Set log level for River logging (elasticsearch log)
- info | error | debug
- retryCount number
- No. of retries if elasticsearch write fails
- loglevel String
Examples
Setup River Object
const elasticsearch = require('elasticsearch');
const MongoClient = require('mongodb').MongoClient;
const River = require('mongo-river-elastic');
// Connect Mongodb
MongoClient.connect('localhost:27017', function (err, mongoClient) {
const _mongo_db_ref = mongoClient.db('test_database');
// connect elasticsearch
const _es_ref = new elasticsearch.Client({host: 'localhost:9200'});
// init River
const ._collection_index_dict = {
'collection1': {index: 'index1', type: 'type1', primaryKeyField: '_id'},
'collection2': {index: 'index2', type: 'type2', primaryKeyField: 'uuid'}
};
const options= {logLevel: 'debug', retryCount: 3};
let river = new River(_mongo_db_ref, _es_ref, _collection_index_dict, options);
CRUDS
- River CRUDS accepts options exactly similar to mongodb CRUDS
- Only difference is, the first argument to any CRUD is name the of collection
- mongodb_options is optional, similar to mongodb
- INSERT
const mongodb_options = {}
//insert, insertOne, insertMany
river.insertOne('collection1', {'_id': '1', 'key': 'value'}, mongodb_options, (err, response) => {
...
});
river.insertMany('collection2', [
{'uuid': '1', 'key': 'value1'},
{'uuid': '2', 'key': 'value2'},
{'uuid': '3', 'key': 'value3'}], (err, response) => {
...
});
- UPDATE
//update, updateOne, updateMany
river.updateOne('collection1', {'key': 'value'}, {$set: {'anotherKey': 'anotherValue'}}, mongodb_options, (err, response) => {
...
});
- DELETE
//deleteOne, deleteMany
river.deleteOne('collection1', {'key': 'value'},mongodb_options, (err, response) => {
...
});
- REPLACE
river.replaceOne('collection1', {'key': 'value'}, {
'_id': 10,
'key': 'value',
'anotherKey': 'another value'
}, (err, response) => {
});
- BULK WRITE
river.bulkWrite('collection1',
[
{insertOne: {document: {'_id': '1', 'key': 'value'}}},
{updateOne: {filter: {'key': 'value'},update: {$set: {'anotherKey': 'anotherValue'}},upsert: true}},
{deleteOne: {filter: {'key': 'value'}}},
],mongodb_options, (err, response) => {
});
License
This project is licensed under the MIT License