Data Pusher
I am a simple ETL tool.
- I've got a logger and basic process control
- I've got connection types that I support
- Postgres
- CSV
- ... your own!
Philosophy
This ETL assumes the following:
- You want to replicate your data in a /streaming/ manner, ie: you want to always poll "sources" and only add the new/updates to "destinations"
- The details of the source data's types are not that important. i.e. if an
int
becomes abigint
, that's OK - You need a higher-level programming language as part of your ETL. Perhaps you are decorating your data with information from an API...
Why Node?
With async/await, node is now the best way to program parallel processes which spend most of its time waiting for data. An ETL is largely asking one source for data, doing some simple (read: non-cpu bound) transformation on that data, and then sending it off to another destination. With promise flow control, this again becomes very simple!
Example
Say you want to move all the tables with data newer than X from one database to another. We will be demoing this with a Rails-like database, where an updated_at
column can be used to check for new or updated records.
const DataPusher = const connections = source: type: 'pg' connectionString: processenvSOURCE destination: type: 'pg' connectionString: processenvDESTINATION const etl = connectionsconst updateColumns = 'updated_at' 'created_at' const main = async { await etl let promises = const tables = await etlconnectionssource for let i in tables promises await Promiseallpromises await etl} const copyTable = async { let copyTypeMode = 'full' let tableUpdateCol const destinationTables = await etlconnectionsdestination if destinationTables const columns = await etlconnectionsdestination updateColumns if copyTypeMode === 'full' await etlconnectionssource else const latest = await etlconnectionsdestination await etlconnectionssource } { await }
This example can be run with node ./examples/simpleRails.js
Creating your own connections
Connections must support the following methods:
async connect()
async end ()
async read('id', handler, ...)
async write('id', data, ...)
And then any other methods you might want
Notes:
- I require node.js v10+, as there are some helpers for pipes and filters which this project uses
- I only speak Postgres (v9.5+ required for upserts)
- I only log to STDERR and STDOUT
Thanks
.