@ovotech/bigquery-pg-sink
TypeScript icon, indicating that this package has built-in type declarations

2.0.1 • Public • Published

BigQuery PG Sink

Stream the results of query made by nodejs-bigquery into a postgres database.

Using

yarn add @ovotech/bigquery-pg-sink

Creating a Sink

const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres');
const pgSink = new BigQueryPGSinkStream({
  pg: db,
  insert: insertQuery,
});

bigquery
    .createQueryStream('___BIGQUERY_QUERY_STRING___')
    .pipe(pgSink)

Creating insert functions

You can directly map each record returned to it a single insert query

import { RowMetadata } from '@google-cloud/bigquery';
import { BigQueryPGSinkStream, InsertBatch } from '@ovotech/bigquery-pg-sink';
import { Client } from 'pg';

export const insertQuery = (rows: RowMetadata): InsertBatch[] => {
  return rows.map(bigQueryResult => ({
    query: `INSERT INTO table
      (
        id,
        balance
      ) VALUES $1, $2
    `,
    values: [bigQueryResult.id, bigQueryResult.balance],
  }));

It is possible to speed up the insertion by using a bulk insert, however this would mean you need to programatically build up the query based on the size of the rows passed to your insertQuery function

import { RowMetadata } from '@google-cloud/bigquery';
import { BigQueryPGSinkStream, InsertBatch } from '@ovotech/bigquery-pg-sink';
import { Client } from 'pg';

export const insertQuery = (rows: RowMetadata): InsertBatch[] => {
  // transform each result into a flat array of values
  // i.e. [1, 200, 2, 300]
  const flatRows = rows.map(bigQueryResult => {
    return [
      bigQueryResult.id,
      bigQueryResult.balance,
    ]
  }).flat();
  
  // generate the values insert string
  // i.e. ($1,$2,$3,.....)
  const columns = [...Array(11)];
  const insertValuesString = rows
    .map(
      (_, rowIndex) =>
        `(${columns
          .map((row: any, index) => '$' + {index + 1 + rowIndex * columns.length})
          .join(',')})`,
    )
    .join(',');

  return [{
    query: `INSERT INTO table
      (
        id,
        balance
      ) VALUES ${insertValuesString}
    `,
    values: flatRows,
  }];
};

Coding style (linting, etc) tests

Style is maintained with prettier and tslint

yarn lint

Deployment

Deployment is preferment by lerna automatically on merge / push to master, but you'll need to bump the package version numbers yourself. Only updated packages with newer versions will be pushed to the npm registry.

Contributing

Have a bug? File an issue with a simple example that reproduces this so we can take a look & confirm.

Want to make a change? Submit a PR, explain why it's useful, and make sure you've updated the docs (this file) and the tests (see test folder).

License

This project is licensed under Apache 2 - see the LICENSE file for details

Readme

Keywords

none

Package Sidebar

Install

npm i @ovotech/bigquery-pg-sink

Weekly Downloads

42

Version

2.0.1

License

Apache-2.0

Unpacked Size

10.1 kB

Total Files

15

Last publish

Collaborators

  • ovox
  • oep-accounts-bot
  • ovo.backstage.admins
  • bookings-team
  • orion-bot
  • bizval-bot
  • oeptariffs
  • props
  • metering-reads-health-bot
  • ovotech-identity
  • paceteamkaluza
  • trading-and-dispatch
  • retail-payg-tech
  • accrecovo
  • ovo.trading.tech
  • qe-team
  • ovotech-smart-thermostat
  • rise-team
  • engagement-insights
  • myovo-self-serve-service-account
  • mars-rover
  • ape-team
  • kaluza-devex
  • ohs-aurora
  • kaluza-rnr
  • ipa-bot
  • kawbot
  • data.discovery.ovo
  • ovotech-sg
  • ovotech-qs
  • ovoenergyapps
  • homemoves
  • ovo-oot-bot
  • cp-ui-tooling
  • ovo-bit-tech
  • sir_hiss