queue-schedule
TypeScript icon, indicating that this package has built-in type declarations

3.0.6 • Public • Published

Queue Shedule

NPM version Build status Test coverage

Kafka is a high avaliable message queue, but it lacks of consuming message with a slow speed. Some of task with no need to finish it at none, and we want to complete it with a small cost. This is just the reason why we develop Queue Shedule.

Install

npm install queue-schedule

How to use

Use rdkafka

const Kafka = require('node-rdkafka');
const {RdKafkaProducer,RdKafkaConsumer} = require('queue-schedule');
const producerRd = new Kafka.HighLevelProducer({
    'metadata.broker.list': KAFKA_HOST,
    'linger.ms':0.1,
    'queue.buffering.max.ms': 500,
    'queue.buffering.max.messages':1000,
    // debug: 'all'
});
producerRd.on('event.error',function(err) {
    slogger.error('producer error');
});
producerRd.on('event.log',function(log) {
    slogger.debug('producer log',log);
});
const producer = new RdKafkaProducer({
    name : SCHEDULE_NAME1,
    topic: TOPIC_NAME1,
    producer:producerRd,
    delayInterval: 500
});
producer.addData(FIST_DATA, {},function(err) {
    if (err) {
        slogger.error('write to queue error',err);
        return done('write to queue error');
    }
    slogger.info('write to kafka finished');
});


const consumer = new Kafka.KafkaConsumer({
    'metadata.broker.list': KAFKA_HOST,
    'group.id': 'test-rdkafka-0',
    'auto.offset.reset':'earliest',
    'socket.keepalive.enable': true,
    'socket.nagle.disable': true,
    'enable.auto.commit': true,
    'fetch.wait.max.ms': 5,
    'fetch.error.backoff.ms': 5,
    'queued.max.messages.kbytes': 1024 * 10,
    debug:'all'
});
let hasDone = false;
new RdKafkaConsumer({
    name: 'kafka',
    consumer,
    topics: [ TOPIC_NAME1],
    
    doTask:function(messages,callback) {
        slogger.trace(messages);
    },
    readCount : 1,
    pauseTime : 500,
    idleCheckInter: 10 * 1000
}).on(RdKafkaConsumer.EVENT_CONSUMER_ERROR,function(err) {
    slogger.error('consumer error',err);
    hasDone = true;
    done(err);
}).on(RdKafkaConsumer.EVENT_CLIENT_READY,function() {
    slogger.trace('the consumer client is ready');
    
}).on(RdKafkaConsumer.EVENT_LOG,function(log) {
    // slogger.trace(JSON.stringify(log));
});

Using kafkajs

const { Kafka } = require('kafkajs');
const {KafkaJsProducer,KafkaJsConsumer} = require('queue-schedule');

const FIST_DATA = {a:1,b:2};
const SCHEDULE_NAME1 = 'schedule1';
const TOPIC_NAME1 = 'topic.kafkajs';
const client =  new Kafka({
    brokers: ['xxxx', 'yyyy']
});

const producer = new KafkaJsProducer({
    topic: TOPIC_NAME1,
    client,
});
producer.addData(FIST_DATA, {},function(err) {
    if (err) {
        console.error('write to queue error',err);
        return;
    }
    console.info('write to kafka finished');
});
producer.on(KafkaJsProducer.EVENT_PRODUCER_ERROR, function(err) {
    console.error('error in consumer', err);
});

new KafkaJsConsumer({
    name: 'kafka',
    client,
    topic: TOPIC_NAME1,
    consumerOption: {
        groupId: 'kafkajs',
        fromBeginning: true
    },
    doTask:function(messages,callback) {
        console.log(messages);
        const value = messages[0].value;//read the first value
        let data = null;
        try {
            data = JSON.parse(value);
            console.log('recieve data',data);
        } catch (e) {
            console.error('parse message error',e);
        }

        callback();//the next loop
    },
    readCount : 1,
    pauseTime : 500,
    idleCheckInter: 10 * 1000
}).on(KafkaJsConsumer.EVENT_CONSUMER_ERROR,function(err) {
    console.error('consumer error',err);
    hasDone = true;
    done(err);
}).on(KafkaJsConsumer.EVENT_CONSUMER_READY,function() {
    console.log('the consumer is ready');
});

API

For detail usage, see the document online here.

License

MIT

Dependencies (0)

    Dev Dependencies (10)

    Package Sidebar

    Install

    npm i queue-schedule

    Weekly Downloads

    6

    Version

    3.0.6

    License

    MIT

    Unpacked Size

    87.1 kB

    Total Files

    39

    Last publish

    Collaborators

    • whyun-master