kafka-executor
Listens to topics and executes asynchronous functions able to process each kafka message,
ensuring that any processing will succeed, before the corresponding message offset is committed.
Installation
Usage
Documentation
Features
Simple API
Ensures that all the jobs will be executed successfully before a message will be committed
Retry strategy for jobs that fail
Graceful shutdown
Installation
Install with yarn or npm
yarn add kafka-executor
npm install kafka-executor --save
Usage
Basic
import KafkaExecutor , { Job } from ' kafka-executor ' ;
const executor = new KafkaExecutor ( {
brokerList : ' 0.0.0.0:9092,0.0.0.0:9091 ' ,
topics : [ ' topic1 ' , ' topic2 ' ] ,
groupId : ' groupId ' ,
} ) ;
executor . init ( ) ;
executor . addJob ( ' myJobId ' , new Job ( ( kafkaMessage ) => {
console . log ( kafkaMessage ) ;
return Promise . resolve ( ) ;
} ) ) ;
Documentation
Job
import { Job } from ' kafka-executor ' ;
new Job ( ( ) => Promise . resolve ( ) , {
maxRetries ? : number ,
retryDelay ? : number | ( retryNumber : number ) => number ,
shouldRetry ? : boolean | ( err : Error ) => boolean ,
} )
Name
Required
Default
Description
maxRetries: number
no
3
How many times must retry until fail
retryDelay: number | (retryIndex)=>number
no
60000 ms
The delay between the retries in ms
shouldRetry: boolean | (error)=>boolean
no
true
Determines if a job have to retry in case of failure
KafkaExecutor
Options
import KafkaExecutor from ' kafka-executor ' ;
new KafkaExecutor ( {
brokerList : string ;
groupId : string ;
topics : string [ ] ;
connectionTimeout : string [ ] ;
checkInterval ? : number ;
batchSize ? : number ;
errorHandler ? : ( err : Error [ ] , message : KafkaMessage , commit : Function ) => void ;
logger ? : ( message : string , type : LogType , code ? : string ) => void ;
maxRetries ? : number ;
retryDelay ? : number ;
consumer ? : object ;
} )
Name
Required
Default
Description
brokerList: string
yes
-
Initial list of brokers as a CSV list of broker host or host:port
topics: [string]
yes
-
The topics that the consumer will listen to
groupId: string
yes
-
Client group id string. All clients sharing the same group.id belong to the same group
checkInterval: number
no
2000
How match time to wait until check for new messages in case of dead period
batchSize: number
no
1
How many messages to process concurrently, Change this according to your error tolerance
errorHandler: (error ,kafkaMessage ,commit:Function)=>void
no
yes
A function responsible for handling job errors. By Default the process will exit with code 1
logger: (message:string, type:'info'|'warn'|'error', code )=>void
no
console
A function responsible for logging
consumer: object
no
-
Options for the consumer see rdkafka configuration options
maxRetries: number
no
3
Global configuration for all jobs
retryDelay: number
no
60000 ms
Global configuration for all jobs
Functions
import KafkaExecutor from ' kafka-executor ' ;
const executor = new KafkaExecutor ( {
brokerList : ' 0.0.0.0:9092 ' ;
groupId : ' group ' ;
topics : [ ' topic ' ] ;
} ) ;
executor . addJob ( ' myJobId ' , new Job ( ... ) )
executor . init ( )
executor . removeJob ( ' myJobId ' )
executor . on ( ' event ' , Function )
executor . shutdown ( )
Name
Description
init: (jobId:string)=>Promise)
Initialize the kafka-executor and connect consumer with the kafka.
addJob: (jobId:string, new Job (...))=>void)
Adds a job in the processing flow.
removeJob: (jobId:string)=>void)
removes a job.
on: (jobId:string)=>void)
Listens in a variant of events handled by kafka-executor and rdkafka
shutdown: (jobId:string)=>Promise)
shutdown the process gracefully ensuring that the pending jobs will finish before exit
Events
Event
Arguments
Description
message.received
kafkaMessage []
Fires when the consumer gets a message
message.committed
kafkaMessage
Fires when the consumer commits a message
processing.error
kafkaMessage , error
Fires when one or more jobs fail
shutdown
-
Fires when the kafka-executor shutdown
node-rdkafka events
Event
Description
data
When using the Standard API consumed messages are emitted in this event.
disconnected
The disconnected
event is emitted when the broker disconnects. This event is only emitted when .disconnect
is called. The wrapper will always try to reconnect otherwise.
ready
The ready
event is emitted when the Consumer
is ready to read messages.
event
The event
event is emitted when librdkafka
reports an event (if you opted in via the event_cb
option).
event.log
The event.log
event is emitted when logging events occur (if you opted in for logging via the event_cb
option). You will need to set a value for debug
if you want information to send.
event.stats
The event.stats
event is emitted when librdkafka
reports stats (if you opted in by setting the statistics.interval.ms
to a non-zero value).
event.throttle
The event.throttle
event is emitted when librdkafka
reports throttling.
kafkaMessage
{
value : Buffer ,
size : number ,
topic : string ,
offset : number ,
partition : number ,
key : string ,
timestamp : number
}
Name
Type
Description
value
Buffer
message contents as a Buffer
size
number
size of the message, in bytes
topic
string
topic the message comes from
offset
number
offset the message was read from
partition
string
partition the message was on
key
number
key of the message if present
timestamp
number
timestamp of message creation
error
{
... Error ,
jobId : string ,
status ? : string ,
}
Name
type
Description
jobId
the failed job
status
the http status if exists
codes
Name
Description
kafkaError
Log produced by kafka
connectionError
Log produced when trying to connect to kafka
jobFailed
Log produced by a job
jobRetry
Log produced by a job when retries