trailpack-tasker
Easily set up background workers with RabbitMQ and Trails. This project is built on top of the rabbot RabbitMQ client.
Forked from trailpack-tasker due to lack of maintanance.
Install
$ npm install --save trailpack-tasker2
Configure
Add Trailpack
// config/main.jsmoduleexports = packs: // ... other trailpacks
Configure Tasker Settings
// config/tasker.jsmoduleexports = /** * Define worker profiles. Each worker of a given type listens for the * "tasks" defined in its profile below. The task names represent a Task * defined in api.services.tasks. Note that 'memoryBound' and 'cpuBound' are * arbitrary names. */ profiles: memoryBound: tasks: 'hiMemoryTask1' cpuBound: tasks: 'VideoEncoder' 'hiCpuTask2' /** * Set RabbitMQ connection info */ connection: exchange: processenvTASKER_EXCHANGE // optional, defaults to `tasker-work-x` workQueueName: processenvTASKER_WORK_QUEUE // optional, defaults to `tasker-work-q` interruptQueueName: processenvTASKER_INTERRUPT_QUEUE // optional, defaults to `tasker-interrupt-q` /** * The RabbitMQ connection information. * See: https://www.rabbitmq.com/uri-spec.html */ host: processenvTASKER_RMQ_HOST user: processenvTASKER_RMQ_USER pass: processenvTASKER_RMQ_PASS port: processenvTASKER_RMQ_PORT vhost: processenvTASKER_RMQ_VHOST /** * Connection information could also be passed via uri */ uri: processenvTASKER_RMQ_URI /** * Additional, optional connection options (default values shown) */ heartbeat: 30 timeout: // this is the connection timeout (in milliseconds, per connection attempt), and there is no default failAFter: 60 // limits how long rabbot will attempt to connect (in seconds, across all connection attempts). Defaults to 60 retryLimit: 3 // limits number of consecutive failed attempts /** * Limit the amount of concurrent tasks, depending on the amount of workers increase this value. */ concurrentTasks: 5 /** * Set worker to subscribe to tasks in the matching profile (tasker.profiles). * If process.env.WORKER does not match a profile, the application will not subscribe to any tasks */ worker: processenvWORKER
worker
Environment
Configure // config/env/worker.jsmoduleexports = main: /** * Only load the packs needed by the workers */ packs:
If the worker profiles each require more granular environment configurations,
create worker-cpuBound
, worker-memoryBound
, etc. environments.
Include tasks in the app object
Create a directory api/tasks
. Any task definitions will be created as classes in this directory.
Create api/tasks/index.js
to export all of the tasks.
Include this directory in api/index.js
. Here is an example:
// api/index.js exportscontrollers = exportsmodels = exportspolicies = exportsservices = exportstasks =
Usage
Define tasks in api.tasks
. Tasks are run by a worker processes.
// api/tasks/VideoEncoder.js const Task = Taskmoduleexports = /** * "message" is the message from RabbitMQ, and contains all the information * the worker needs to do its job. By default, sets this.message and this.app. * * @param message.body.videoFormat * @param message.body.videoBuffer */ { superapp message } /** * Do work here. When the work is finished (the Promise is resolved), send * "ack" to the worker queue. You must override this method. * * @return Promise */ { return } /** * This is a listener which is invoked when the worker is interrupted (specifically, * an interrupt is a particular type of message that instructs this worker to * stop). */ { thislog } /** * Perform any necessary cleanup, close connections, etc. This method will be * invoked regardless of whether the worker completed successfully or not. * @return Promise */ { return }
To start a task, publish a message via the app.tasker
interface:
const taskId = app.tasker.publish('VideoEncoder', { vidoeUrl: 'http://...' }
To interrupt a task in progress, use the taskId
that is returned from app.tasker.publish(..):
app.tasker.cancel('VideoEncoder', taskId)
Deployment
An example Procfile may look like:
web: npm start
memoryBound: NODE_ENV=worker WORKER=memoryBound npm start
cpuBound: NODE_ENV=worker WORKER=cpuBound npm start
License
MIT