Kafka Client http://incubator.apache.org/kafka/
A node client for Kafka
var Kafka = require'franz-kafka'var kafka =zookeeper: 'localhost:2181'compression: 'gzip'queueTime: 2000batchSize: 200logger: consolekafkaon'connect'// topics are Streamsvar foo = kafkatopic'foo'var bar = kafkatopic'bar'// consume with a pipefoopipeprocessstdout// or with the 'data' eventfooon'data' console.logdata// produce with a pipeprocessstdinpipebar// or just write to itbarwrite'this is a message'// resume your consumer to get it startedfooresume// don't forget to handle errorsfooon'error' console.error"STAY CALM"
To test the example first get kafka running. Follow steps 1 and 2 of the quick start guide
Then you can run
node example.js to see messages getting produced and consumed.
var Kafka = require'franz-kafka'var kafka =brokers: // an array of broker connection infoid: 0 // the server's broker idhost: 'localhost'port: 9092// producer defaultscompression: 'none' // default compression for producingmaxMessageSize: 1000000 // limits the size of a produced messagequeueTime: 5000 // milliseconds to buffer batches of messages before producingbatchSize: 200 // number of messages to bundle before producing// consumer defaultsminFetchDelay: 0 // minimum milliseconds to wait between fetchesmaxFetchDelay: 10000 // maximum milliseconds to wait between fetchesmaxFetchSize: 300*1024 // limits the size of a fetched messagelogger: null // a logger that implements global.console (for debugging)
An array of connection info of all the brokers this client can communicate with
The compression used when producing to kafka. May be, 'none', 'gzip', or 'snappy'
The largest size of a message produced to kafka. If a message exceeds this size,
the Topic will emit an 'error'. Note that
batchSize affects the size of messages
because batches of messages are bundled as individual messages.
The time to buffer messages for bundling before producing to kafka. This option
is combined with
batchSize. Whichever comes first will trigger a produce.
The number of messages to bundle before producing to kafka. This option
is combined with
queueTime. Whichever comes first will trigger a produce.
The minimum time to wait between fetch requests to kafka. When a fetch returns
zero messages the client will begin exponential backoff between requests up to
maxFetchDelay until messages are available.
The maximum time to wait between fetch requests to kafka after exponential backoff has begun.
The maximum size of a fetched message. If a fetched message is larger than this size the Topic will emit an 'error' event.
Connects to the Kafka cluster and runs the callback once connected.
Get a Topic for consuming or producing. The first argument is the topic name and the second are the topic options.
var foo = kafkatopic'foo'// default optionsminFetchDelay: 0 // defaults to the kafka.minFetchDelaymaxFetchDelay: 10000 // defaults to the kafka.maxFetchDelaymaxFetchSize: 1000000 // defaults to the kafka.maxFetchSizecompression: 'none' // defaults to the kafka.compressionbatchSize: 200 // defaults to the kafka.batchSizequeueTime: 5000 // defaults to the kafka.queueTimepartitions:consume: '0-0:0' // array of strings with the form 'brokerId-partitionId:startOffset'produce: '0:1' // array of strings with the form 'brokerId:partitionCount'
This structure describes which brokers and partitions the client will connect to for producing and consuming.
An array of partitions to consume and what offset to begin consuming from in the form of 'brokerId-partitionId:startOffset'. For example broker 2 partition 3 offset 5 is '2-3:5'
An array of brokers to produce to with the count of partitions in the form of 'brokerId:partitionCount'. For example broker 3 with 8 partitions is '3:8'
Fires when the client is connected to a broker.
A topic is a Stream that may be Readable for consuming and Writable for producing. Retrieve a topic from the kafka instance.
var topic = kafkatopic'a topic'
Pause the consumer stream
Resume the consumer stream
Destroy the consumer stream
Sets the encoding of the data emitted by the
Write a message to the topic. Returns false if the message buffer is full.
Pipe the stream of messages to the next Writable Stream
Fires for each message. Data is a Buffer by default or a string if
Fires when the producer stream can handle more messages
Fires when there is a produce or consume error