Kafka Docker Wrapper
Running
- run
npm install
- run
docker-compose up
You can now see the producer tab making new messages and the consumer tab reading those messages. You can start more producers (while still running 1 consumer) and see more messages coming in.
Topic Errors
If you are getting an error about the topic not existing, try running docker-compose rm
to delete the containers. Then re-run docker-compose up
again.
Use
Install
$ npm install kafka-node-wrapper --save
const Kafka = ; // Node < 9.x// import * as Kafka from 'kafka-node-wrapper' // Node > 9.x const consumer = ;const producer = ; // Connect with producer and consumer in parallel { return Promiseallconsumer producer;} { // Consumer Events consumer ; consumer ; // Producer Events producer ; producer ; } // DEMO! ;
Configuration
All
Configurations custom to this wrapper
Field | Description | Type | Default |
---|---|---|---|
throttle | Throttle interval time (ms) | Number | 500 |
topics | Topics to subscribe to | String[] | ['kafka-test-topic'] |
autoInterval | Allow auto intervals for polling (producer) and consuming (consumer). | boolean | true |
Consumer
Configurations custom to this wrapper's Consumer class.
Field | Description | Type | Default |
---|---|---|---|
consumeMax | Number of messages to consume for each interval. | Number | 1 |
This rest of the configuration is described here.
Examples
Basic Usage
Sample code for sample pub sub.
const Kafka = ; const consumer = ;const producer = ; /** * @param * @return */ { return { con ; con ; console; con ; };} /** * @param pro * @return */ { return { pro ; pro ; console; const message = foo: 1 bar: 2 ; pro; };} { Promiseallconsumer producer ; } ; moduleexports = consumer: consumer producer: producer consumerEvent: consumerEvent producerEvent: producerEvent;
Overriding
Sample code for doing your own pulling or commit/consume
const KafkaWrapper = ; /* ARBITRARY EXTERNAL CODE BEGIN *//** * This callback type is called `requestCallback` and is displayed as a global symbol. * @callback requestCallback * @param {{name: string, age: number}} responseCode *//** * Some stuff that has a callback (maybe write to a database or whatever) * @param data * @param */ { ;} /** * Promise Wrapper for our doStuffCB function * @param * @return {Promise<{name: string, age: number}>} */ { return { ; };} /** * Does stuff * @param * @return {Promise<{name: string, age: number}>} */ { return { ; };}/* ARBITRARY EXTERNAL CODE END */ { this_interval = null; thisconsumer = consumeMax: 10 autoInterval: false topics: 'upload-user'; // we must consume/commit ourselves thisproducer = topics: 'upload-user'; } /** * Run Service * @param message * @return */ async { // Connect await thisconsumer; await thisproducer; return await this; } async { return async { // Send message to publish something await thisproducer; // Start consuming 1 message every 500 ms this; // Listen to Messages thisconsumer; // Logging thisconsumer; // End scenario based on some condition ; // 20 seconds }; } /** * Process Function * @param message * @return */ async { const user1 = await ; // Step 1 const user2 = await ; // Step 2 const output = user1: user1 user2: user2 ; return await thisproducer; // Step 3 - Send } { this_interval = ; } { ; } const override = ;override ;