Noosphere Possibilities Maximized

    queue-schedule
    TypeScript icon, indicating that this package has built-in type declarations

    3.0.5 • Public • Published

    Queue Shedule

    NPM version Build status Test coverage

    Kafka is a high avaliable message queue, but it lacks of consuming message with a slow speed. Some of task with no need to finish it at none, and we want to complete it with a small cost. This is just the reason why we develop Queue Shedule.

    Install

    npm install queue-schedule

    How to use

    Use rdkafka

    const Kafka = require('node-rdkafka');
    const {RdKafkaProducer,RdKafkaConsumer} = require('queue-schedule');
    const producerRd = new Kafka.HighLevelProducer({
        'metadata.broker.list': KAFKA_HOST,
        'linger.ms':0.1,
        'queue.buffering.max.ms': 500,
        'queue.buffering.max.messages':1000,
        // debug: 'all'
    });
    producerRd.on('event.error',function(err) {
        slogger.error('producer error');
    });
    producerRd.on('event.log',function(log) {
        slogger.debug('producer log',log);
    });
    const producer = new RdKafkaProducer({
        name : SCHEDULE_NAME1,
        topic: TOPIC_NAME1,
        producer:producerRd,
        delayInterval: 500
    });
    producer.addData(FIST_DATA, {},function(err) {
        if (err) {
            slogger.error('write to queue error',err);
            return done('write to queue error');
        }
        slogger.info('write to kafka finished');
    });
    
    
    const consumer = new Kafka.KafkaConsumer({
        'metadata.broker.list': KAFKA_HOST,
        'group.id': 'test-rdkafka-0',
        'auto.offset.reset':'earliest',
        'socket.keepalive.enable': true,
        'socket.nagle.disable': true,
        'enable.auto.commit': true,
        'fetch.wait.max.ms': 5,
        'fetch.error.backoff.ms': 5,
        'queued.max.messages.kbytes': 1024 * 10,
        debug:'all'
    });
    let hasDone = false;
    new RdKafkaConsumer({
        name: 'kafka',
        consumer,
        topics: [ TOPIC_NAME1],
        
        doTask:function(messages,callback) {
            slogger.trace(messages);
        },
        readCount : 1,
        pauseTime : 500,
        idleCheckInter: 10 * 1000
    }).on(RdKafkaConsumer.EVENT_CONSUMER_ERROR,function(err) {
        slogger.error('consumer error',err);
        hasDone = true;
        done(err);
    }).on(RdKafkaConsumer.EVENT_CLIENT_READY,function() {
        slogger.trace('the consumer client is ready');
        
    }).on(RdKafkaConsumer.EVENT_LOG,function(log) {
        // slogger.trace(JSON.stringify(log));
    });

    Using kafkajs

    const { Kafka } = require('kafkajs');
    const {KafkaJsProducer,KafkaJsConsumer} = require('queue-schedule');
    
    const FIST_DATA = {a:1,b:2};
    const SCHEDULE_NAME1 = 'schedule1';
    const TOPIC_NAME1 = 'topic.kafkajs';
    const client =  new Kafka({
        brokers: ['xxxx', 'yyyy']
    });
    
    const producer = new KafkaJsProducer({
        topic: TOPIC_NAME1,
        client,
    });
    producer.addData(FIST_DATA, {},function(err) {
        if (err) {
            console.error('write to queue error',err);
            return;
        }
        console.info('write to kafka finished');
    });
    producer.on(KafkaJsProducer.EVENT_PRODUCER_ERROR, function(err) {
        console.error('error in consumer', err);
    });
    
    new KafkaJsConsumer({
        name: 'kafka',
        client,
        topic: TOPIC_NAME1,
        consumerOption: {
            groupId: 'kafkajs',
            fromBeginning: true
        },
        doTask:function(messages,callback) {
            console.log(messages);
            const value = messages[0].value;//read the first value
            let data = null;
            try {
                data = JSON.parse(value);
                console.log('recieve data',data);
            } catch (e) {
                console.error('parse message error',e);
            }
    
            callback();//the next loop
        },
        readCount : 1,
        pauseTime : 500,
        idleCheckInter: 10 * 1000
    }).on(KafkaJsConsumer.EVENT_CONSUMER_ERROR,function(err) {
        console.error('consumer error',err);
        hasDone = true;
        done(err);
    }).on(KafkaJsConsumer.EVENT_CONSUMER_READY,function() {
        console.log('the consumer is ready');
    });

    API

    For detail usage, see the document online here.

    License

    MIT

    Install

    npm i queue-schedule

    DownloadsWeekly Downloads

    20

    Version

    3.0.5

    License

    MIT

    Unpacked Size

    87.1 kB

    Total Files

    39

    Last publish

    Collaborators

    • yunnysunny