firequeue

2.2.11 • Public • Published

firequeue

api has been completly redesigned. All the methods returns streams that you can pipe, combine, fork .. to accomodate your needs

Platform Compatibility

the queue engine requires node 0.11.x or greater, and --harmony flag to to get access to generators

Creating jobs however, is as simple as pushing job payload to the queue incoming node

ui

checkout https://github.com/jogabo/firequeue-ui (not ported to v1 yet)

Examples

// run ./example.js
 
import firequeue from 'firequeue'
import { concat, throughSync } from 'stream-util'
import parallel from 'concurrent-transform'
 
const queue = firequeue.init('https://firequeue-test.firebaseio.com')
const logger = (fn) => throughSync(function(data) {
  console.log(fn(data))
  this.push(data)
})
 
// create some jobs
queue.jobs.push({ task: 'task1', data: { name: 'job1' } })
queue.jobs.push({ task: 'task1', data: { name: 'job2' } })
queue.jobs.push({ task: 'task1', data: { name: 'job3' }, delayed: '2s' })
queue.jobs.push({ task: 'task2', data: { name: 'job4' } })
 
// create job and listen to job updates
queue.jobs
  .push({ task: 'task3', data: { name: 'job5' } })
  .child('state').on('value', (s) => console.log(`job changed state to ${s.val()}`))
 
// log 'job changed state to queued'
// log 'job changed state to activated'
// log 'job changed state to completed'
 
// start queue engine
queue
  .start()
  .pipe(logger(({ task, key, state }) => `task: ${task}, job: ${key}, state: ${state}`))
 
// log task: task1, job: job1, state: queued
// log task: task1, job: job2, state: queued
// log task: task1, job: job3, state: delayed
// ...
 
// process task1
const task1 = queue
  .read('task1')
  .pipe(queue.process((job) => {
    // do some work with job.key(), job.val()
    return Promise.resolve()
  }))
  .pipe(logger(({ task, key, state }) => `task: ${task}, job: ${key}, state: ${state}`))
 
// log task: task1, job: job1, state: completed
// log task: task1, job: job2, state: completed
// ...
 
// process task2 with maxAttempts and backoff
const task2 = queue
  .read('task2')
  .pipe(queue.maxAttempts(2))
  .pipe(queue.backoff('2s'))  // wait 2s before retrying
  .pipe(queue.process((job) => {
    console.log('do some work with', job.key(), job.val())
    const attempts = job.child('attempts').val() || 0
    return  attempts < 2
      ? Promise.reject()
      : Promise.resolve()
  }))
 
// process task3 with a concurrency of 10
const task3 = queue
  .read('task3')
  .pipe(parallel(queue.process((job) => {
    console.log('do some work with', job.key(), job.val())
    return Promise.resolve()
  }), 10))
 
 
// remove completed jobs
concat(task1, task2, task3)
  .pipe(queue.clean('completed'))
  .pipe(logger(({ task, key, state }) => `task: ${task}, job: ${key}, state: ${state}`))
 
// log task: task1, job: job1, state: cleaned
// log task: task1, job: job2, state: cleaned
// ...
 
// remove failed jobs after 1 day
queue
  .readJobsByStateWithDelay('failed', '1d')
  .on('data', (snap) => snap.ref().remove())
 
// 30sec later...
setTimeout(() => {
  queue.stop().then(() => {
    console.log('queue was stopped successfuly')
  })
}, 30000)

Api

init(ref: Firebase|String)

create a new queue at this firebase location

start()

Start the queue return a stream of object { task, state, key } object

stop()

Stop the queue, return a Promise that resolved when all the currently active jobs have run.

read(taskName: String)

create readable stream of taskName jobs

process(fn|fn*)

create a transform job that process a task fn should return a yieldable object (https://github.com/tj/co#yieldables)

maxAttempts(n: Int)

create a transform stream that define the number of attempts before a job is marked as failed

backoff(time: String|Date)

create a transform stream that define the time to wait before retrying a failed job

clean(fn|string)

create a transform that remove job that match the filter

Firebase structure

The firebase backend is structured as follow:

-jobs (list of all the jobs / priority = -job.createdAt / indexedOn state)
 |-job1: {}
 |-job2: {}
 |-job3: {}
 |-job4: {}
-tasks (list of queued jobs reference by task)
 |- task-name (priority = job.priority)
   |- job1: true
   |- job2: true
-delayed (list jobs events)
   |- job3: true
   |- job4: true

job structure

- data (Object)             recommanded path to store job payload
- attempts (Number)         number of failed attempts
- createdAt (Timestamp)     creation timestamp
- queuedAt (Timestamp)      queued timestamp
- completedAt (Timestamp)   completed timestamp
- failedAt (Timestamp)      failed timestamp

Readme

Keywords

none

Package Sidebar

Install

npm i firequeue

Weekly Downloads

1

Version

2.2.11

License

ISC

Last publish

Collaborators

  • pgherveou2