node-celery

Celery client for Node

Celery client for Node.js

Celery is an asynchronous task/job queue based on distributed message passing. node-celery allows to queue tasks from Node.js. If you are new to Celery check out http://celeryproject.org/

Simple example, included as examples/hello-world.js:

var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://guest:guest@localhost:5672//',
        CELERY_RESULT_BACKEND: 'amqp'
    });
 
client.on('error', function(err) {
    console.log(err);
});
 
client.on('connect', function() {
    client.call('tasks.echo', ['Hello World!'], function(result) {
        console.log(result);
        client.end();
    });
});

Note: When using AMQP as resultbackend with celery prior to version 3.1.7 the result queue needs to be non durable or it will fail with a: Queue.declare: (406) PRECONDITION_FAILED.

var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_TASK_RESULT_DURABLE: false
    });

The ETA (estimated time of arrival) lets you set a specific date and time that is the earliest time at which your task will be executed:

var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://guest:guest@localhost:5672//',
    });
 
client.on('connect', function() {
    client.call('send-email', {
        to: 'to@example.com',
        title: 'sample email'
    }, {
        eta: new Date(Date.now() + 60 * 60 * 1000) // an hour later 
    });
});

The expires argument defines an optional expiry time, a specific date and time using Date:

var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://guest:guest@localhost:5672//',
    });
 
client.on('connect', function() {
    client.call('tasks.sleep', [2 * 60 * 60], null, {
        expires: new Date(Date.now() + 60 * 60 * 1000) // expires in an hour 
    });
});

The backend is used to store task results. Currently AMQP (RabbitMQ) and Redis backends are supported.

var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://guest:guest@localhost:5672//',
        CELERY_RESULT_BACKEND: 'redis://localhost/0'
    });
 
client.on('connect', function() {
    var result = client.call('tasks.add', [1, 2]);
    setTimout(function() {
        result.get(function(data) {
            console.log(data); // data will be null if the task is not finished 
        });
    }, 2000);
});

AMQP backend allows to subscribe to the task result and get it immediately, without polling:

var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://guest:guest@localhost:5672//',
        CELERY_RESULT_BACKEND: 'amqp'
    });
 
client.on('connect', function() {
    var result = client.call('tasks.add', [1, 2]);
    result.on('ready', function(data) {
        console.log(data);
    });
});

The simplest way to route tasks to different queues is using CELERY_ROUTES configuration option:

var celery = require('node-celery'),
    client = celery.createClient({
        CELERY_BROKER_URL: 'amqp://guest:guest@localhost:5672//',
        CELERY_ROUTES: {
            'tasks.send_mail': {
                queue: 'mail'
            }
        }
    }),
    send_mail = client.createTask('tasks.send_mail'),
    calculate_rating = client.createTask('tasks.calculate_rating');
 
client.on('error', function(err) {
    console.log(err);
});
 
client.on('connect', function() {
    send_mail.call([], {
        to: 'to@example.com',
        title: 'hi'
    }); // sends a task to the mail queue 
    calculate_rating.call([], {
        item: 1345
    }); // sends a task to the default queue 
});