@ovotech/kafka-pg-sink
    TypeScript icon, indicating that this package has built-in type declarations

    1.1.3 • Public • Published

    PG Stream

    Store kafka-node events into a postgres database.

    Using

    yarn add @ovotech/kafka-pg-sink

    Each topic from the consumer stream gets its own "resolver" function that converts the message to an array of values, directly inserted as columns in the database.

    INSERT INTO tableName VALUES (column1Value, column2Value ...) ON CONFLICT DO NOTHING
    import { PGSinkStream, Message } from '@ovotech/kafka-pg-sink';
    import { ConsumerGroupStream } from 'kafka-node';
    import { Client } from 'pg';
     
    const consumerStream = new ConsumerGroupStream(
      { kafkaHost: 'localhost:29092', groupId: 'my-group', encoding: 'buffer' },
      ['migration-completed'],
    );
     
    const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres');
    const pgSink = new PGSinkStream({
      pg,
      topics: {
        'migration-completed': {
          table: 'migration_completed',
          resolver: (message: Message) => {
            const data = getDataSomehow(message.value);
            return [data.column1, data.column2, data];
          },
        },
      },
    });
     
    consumerStream.pipe(pgSink);

    Message is the same as the type from kafka-node, but is more lenient on the "value field". This allows you to pre-process the message before sending it off to pg-sink, by deserializing an using avro for example.

    export interface Message {
      topic: string;
      value: any;
      offset?: number;
      partition?: number;
      highWaterOffset?: number;
      key?: string;
    }

    Usage with a deserializer

    You can transform kafka events with a transform stream before they arrive to the sink. For example with @ovotech/avro-stream.

    import { AvroDeserializer, AvroMessage } from '@ovotech/avro-stream';
    import { PGSinkStream } from '@ovotech/kafka-pg-sink';
    import { ConsumerGroupStream } from 'kafka-node';
     
    const consumerStream = new ConsumerGroupStream(
      { kafkaHost: 'localhost:29092', groupId: 'my-group', encoding: 'buffer' },
      ['migration-completed'],
    );
    const deserializer = new AvroDeserializer('http://localhost:8080');
    const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres');
    const pgSink = new PGSinkStream({
      pg,
      topics: {
        'migration-completed': {
          table: 'migration_completed',
          resolver: (message: AvroMessage) => [message.value.accountId],
        },
      },
    });
     
    consumerStream.pipe(deserializer).pipe(pgSink);

    Errors

    PGSinkStream can emit an PGSinkError and PGSinkMultipleError depending on whether it was processing batched or normal requests.

    PGSinkError

    Property Description
    message Original error message
    chunk The event sent from the previous stream to be saved to the database (Message)
    encoding The buffer encoding
    originalError The original error object that was triggered

    PGSinkMultipleError

    Property Description
    message Original error message
    chunks An array of { chunk: Message, encoding: string } objects
    originalError The original error object that was triggered

    Example error handling:

    import { PGSinkStream, PGSinkError, PGSinkMultipleError } from '@ovotech/kafka-pg-sink';
     
    const pgSink = new PGSinkStream();
     
    pgSink.on('error', (error: PGSinkError | PGSinkMultipleError) => {
      if (error instanceof PGSinkError) {
        console.log(error.chunk);
      }
    });

    Gotchas

    A thing to be aware of is that node streams unpipe in an event of an error, which means that you'll need to provide your own error handling and repipe the streams if you want it to be resilient to errors.

    Running the tests

    The tests require a running postgres database. This is setup easily with a docker-compose from root project folder:

    docker-compose up

    Then you can run the tests with:

    yarn test

    Coding style (linting, etc) tests

    Style is maintained with prettier and tslint

    yarn lint
    

    Deployment

    Deployment is preferment by lerna automatically on merge / push to master, but you'll need to bump the package version numbers yourself. Only updated packages with newer versions will be pushed to the npm registry.

    Contributing

    Have a bug? File an issue with a simple example that reproduces this so we can take a look & confirm.

    Want to make a change? Submit a PR, explain why it's useful, and make sure you've updated the docs (this file) and the tests (see test folder).

    License

    This project is licensed under Apache 2 - see the LICENSE file for details

    Keywords

    none

    Install

    npm i @ovotech/kafka-pg-sink

    DownloadsWeekly Downloads

    25

    Version

    1.1.3

    License

    Apache-2.0

    Unpacked Size

    19.8 kB

    Total Files

    23

    Last publish

    Collaborators

    • oeptariffs
    • nisheeth
    • fgkramer-klz
    • tony.ross
    • bvjones
    • data.discovery.ovo
    • ralitsa
    • fraserhamiltonovo
    • accrecovo
    • luigi.riefolo
    • andy-heywood-ovo
    • rmcnovo
    • mgdigital-mike
    • sureshmandalapu
    • ggerikp
    • xnejp03
    • emmapr123
    • andrew-brook-rad
    • a.calderwood
    • sulgee.kim
    • sseccombe
    • samcooper720x
    • lughino
    • vlabinskyy-corgi
    • amayuk
    • jagreenwood1
    • mbayoumy
    • radek_tomasek
    • clarencedglee
    • ovocms
    • ovotech-sme-team
    • gjain-npm-ovo
    • kenneth-gray
    • philip-ovo
    • nevenablagoeva
    • pete-woodland
    • orion-migration-team
    • ovotech-sg
    • chanex
    • ovotech-qs
    • kelveden
    • ovotech-paym
    • ovotech-payments
    • ovotech-live
    • ovotech-payg
    • freddybushboy
    • sarahlikeshiny
    • r.midyk
    • ovotech-boost
    • agaovo
    • filose
    • tomverran-ovo
    • props
    • vanyakurdup
    • yesdaveovo
    • rob-ovo
    • ellafutkowska
    • ikerin
    • mwidurek
    • andreaborsos
    • tomshawovo
    • dmytro.kubatko
    • ovotech-identity
    • mike.panayotov
    • iuna4e
    • lcatallo
    • orex-team
    • ahvargas
    • cwkaluza
    • smart-heat-uat
    • smart-heat-prod
    • potsec
    • rosario-ovo
    • tech.international
    • tozzy
    • jvmovo
    • boost-smile
    • nicolasov
    • ovo.cms.devs
    • mkohlmyr
    • sketchingdev
    • mrkiplin
    • ovo-devices
    • sophiepoole
    • joepurnell-ovo
    • gmbovo
    • molbalazs
    • joewhittles
    • sampennington64
    • marcusgriff
    • amelia.ovo
    • juliabutterly
    • chris.smith
    • sophiesillmanovo
    • marcesquerra
    • pedoublety
    • ovo-engagement
    • tokict
    • marcuskielly
    • david.chellapah.ovo
    • kupxc
    • harrisonbaxter
    • apjm
    • shnist
    • mikemchugh
    • sarahbeharry
    • ursularodgers
    • inlustra
    • andrewjtn
    • tarlingovo
    • troyb95
    • david.ovo
    • ovoenergyapps
    • marina-ovo
    • tom-g-dane
    • homemoves
    • samwest
    • sophiefield
    • simonmclean-ovo
    • jamesbaum
    • robert-g-j
    • peterwilkins
    • paceteamkaluza
    • henrywoodsend
    • ovo-oot-bot
    • stuharv-ovo
    • eddiec86
    • richardjonesovo
    • csherwin
    • g-tibbs
    • puzzledbytheweb
    • adryanovo
    • lewright
    • pedro.caldeira
    • aidenscott2016
    • keirlawson
    • dwfullerton
    • gordok
    • darrenthomas-sse
    • seagullmouse
    • markwood23
    • timsteeleovo
    • jacktreble
    • gordon-rennie-ovo
    • adam-mcdevitt
    • jkiely
    • ovo-dc
    • tomsquires
    • orion-digital-support-experience
    • ovotech-smart-thermostat
    • jbeckett
    • yahmad
    • inbrewj
    • kelemensanyi
    • gianlo-ovo
    • wtaylor-ovo
    • jrdavenport
    • dleyland-ovo
    • rob.desbois
    • sicrossley
    • joey.ciechanowicz
    • lewisdick-ovo
    • sjmann
    • stevemossovo
    • nbrites
    • eduardolaranjo
    • unibozu
    • cp-ui-tooling
    • lenardprattovo
    • dieman89
    • jensraaby-ovo
    • anthonysmithovo
    • jacobthwaitesovo
    • tom.sherman
    • tomlloyd
    • matyas-ovo
    • jamesnoble1
    • phil-pinkowski
    • ovo-aarongibbison
    • nebuladesignsystem
    • ovo-matt-hodges
    • luke-adams-ovo
    • ovolly
    • j_kapella
    • mike.walters
    • bgzstephen
    • matthewbursteinovo
    • kslat3r
    • petro.pavlenko
    • mtardugno-ovo
    • vslepkan
    • zoelanham
    • suhaelovo
    • jchoskins
    • vholovko
    • tom.mottram.kaluza
    • anhnguyenis
    • eddwilliamsovo
    • ewan-m
    • mishabruml
    • maciek-kaluza
    • mollyboyle
    • vmary
    • babickd
    • mugishau
    • vkobyletskyi_ovo
    • mykola.p
    • peterh-ovo
    • dvidg
    • ahmediieovo
    • nickfitton-ovo
    • apgoodier
    • dave.allison
    • tom-greenwood
    • oep-accounts-bot
    • cbousie
    • j.okeefe
    • kir_exp
    • ursula_rodgers
    • jgarciapaj
    • petegrace
    • retrojetpacks
    • benaston13
    • ovo-bit-tech
    • fulvio.ovo
    • blair.calderwood.radically
    • pedromss
    • petro.shevchuk.corgi
    • v.kolesnyk
    • simonlewissse
    • benm13563
    • joolskaluza
    • s.sosnytskyi
    • vlavrynenko-nix
    • jifarra-kaluza
    • jameswelshkaluza
    • kimnil
    • carolinelywood
    • mike-gregory-kaluza
    • vasil.dininski
    • olesiapaslavska
    • sir_hiss
    • humancatfood
    • javapyscript
    • csillabarna
    • luca.sale
    • friendigo
    • oroberts221
    • pedro.costa.kaluza
    • jelaxshan
    • onishchenko
    • kevin.gilmore
    • annebeth-ovo
    • kostyagromov92
    • quartin
    • crack.design
    • gothack-ovo
    • georgexcollins
    • kapikaluza
    • v.vasylets
    • iovana.pavlovici
    • filosganga
    • chris.brindley
    • engagement-insights
    • hentielouw
    • jaws-bot
    • leaski
    • elliekempster91
    • zoejm
    • rekaelek-ovo
    • nathanmarshovo
    • robturnerovo
    • dsingh07
    • ovo-james
    • jthomasovo
    • vitalii_khudenko