exframe Workflow
A library for facilitating a distributed workflow as a service system
Features and Assumptions
- RabbitMQ driven workflow engine over durable queues
- Managed state tracked in MongoDB
- Distributed execution of tasks
- Full worklog tracking
- Blocks or sub-workflows
- Fully restartable at any point
Available Topologies
- Workflows
Usage
const logger = require('exframe-logger').create();
const dbClient = require('exframe-db').init(options);
const cacheManager = require('exframe-cache-manager').create(options);
const { WorkflowManager } = require('exframe-workflow');
dbClient.connect();
const serviceName = 'test-service';
const workflowManager = WorkflowManager.default(logger, serviceName, cacheManager, dbClient);
function addOne(context, { inputValue }) { return { result: inputValue + 1 }; }
const myWorkflow = WorkflowManager.createWorkflow('my-workflow');
myWorkflow.task(addOne);
const context = {
user: { userId: 'test-id', userName: 'test-user' },
log: logger
};
app.post('/addOne', (request, response) => {
const instance = await myWorkflow.start(context, { inputValue: response.body.inputValue });
response.send(await instance.getWorkContext());
});
Summary
The WorkflowManager
handles the instantiation of the WorkflowInstanceManager
and the Workflow
. The workflow is bound to the Workflow
is bound to the WorkflowInstanceManager
and when a workflow is started or resumed it will coordinate the WorkflowInstanceManager
to create or load a WorkflowInstance
. A WorkflowInstance
will manage traversing through the tasks, joining on child workflow results, and keeping work context state.
WorkflowManager
Handles the instantiation of the WorkflowInstanceManager
and the Workflow
. It has been configured with a default implementation that wraps the exframe-cache-manager
and exframe-db
for logging, caching, and coordination. WorkflowInstanceManager
will listen on a queue that's unique by the serviceName
. The distributed coordination for starting workflows across multiple instances will rely on this.
static default(logger, serviceName, cacheManager, dbClient, mqClient, options?) -> WorkflowManager
field | description |
---|---|
logger | exframe-logger |
serviceName | string -- the unique name for the service to use for coordinating the start of new workflow instances |
cacheManager |
exframe-cache-manager -- provides the redis backing for the WorkflowInstance cache and the expiry queue for task execution expiration |
dbClient |
exframe-db -- provides the mongo backing for the WorkflowInstance cache and the WorkLog storage |
options |
optional, object -- configures the WorkflowManager
|
options
field | description |
---|---|
serverId | string -- a unique identifier for the server running the service, can help with logging |
timeoutDuration | integer, default - 30 -- the number of seconds a service will wait between trying to pull items off the queue |
maxWorkers | integer, default - 30 -- the number of workers for the worker pool, this governs the number of concurrent workflows that can execute at any given time |
overflowWorkers | integer, default - 15 -- the number of workers over the max number of workers that can execute, this is a bucket reserved for incoming requests or parent workflows that are resuming after all children have completed |
workTtl | integer, default - 600 -- the number of seconds that a workflow execution is locked from being taken over by another service |
keepaliveInterval | integer, default - 60 -- the number of seconds between refreshing the locks |
create(name, options?) -> Workflow
field | description |
---|---|
name | string -- specifies the name of the workflow, generally for logging purposes |
options? | optional, object -- configures the workflow execution |
options
field | description |
---|---|
parallel | boolean -- specifies whether the workflow will execute sequentially through the tasks or in parallel essentially vertically or horizontally |
Workflow
task
task(name|action, action|options, options?) -> void
Add a task to the workflow. For shortcut purposes, action can be a named function and used in lieu of entering the string name
workflow.task('test', (context, workContext) => {});
workflow.task(function test(context, workContext) {});
field | description |
---|---|
name | string -- the name of the task, must be unique |
action |
function -- (context, workContext, { instanceId, index }) -> Promise<Result> -- the action that will be executed when the task is executed by the WorkflowInstance . The instanceId is the id for the workflow and index is the item index from a sourceIterator (if there is one). The result will be merged with the workContext. See Result
|
options | optional, object -- configures the task execution |
options
field | description |
---|---|
next | optional, string -- the name of the next task to execute. If not set, next will automatically be set by whatever the subsequently added task is. If null, next will not be automatically set. If the value is a task command that command will be operated on. See Commands |
pre |
optional, function -- (context, workContext) -> Promise<Result> -- a function that can mutate the context to be given to the action, this is mostly useful for blocks |
post |
optional, function -- (context, workContext, result) -> Promise<Result> -- a function that can process the result relative to the work context and can further modify the result before merge into the work context, this is mostly useful for blocks |
case |
optional, function -- (context, workContext) -> Promise<string> -- a function that can return the name of a task that should be executed next based upon the current work context following the execution of the task. If the value is a task command that command will be operated on. See Commands
|
catch |
optional, function -- (context, workContext, error) -> Promise<string> -- a function that can return the name of a task that should be executed next given the error that was thrown by the task. If the value is a task command that command will be operated on. See Commands
|
pre
workflow.task('test',
(context, preWorkContext) => {}, {
pre: (context, workContext) => {
const preWorkContext = workContext;
return preWorkContext;
}
});
post
workflow.task('test',
(context, workContext) => {
const result = workContext;
return result;
}, {
post: (context, workContext, result) => {
const postResult = result;
return postResult;
}
});
case
workflow.task('test',
(context, workContext) => {},
{
case: (context, workContext) => {
switch (workContext.value) {
case 'test': return 'test',
default: return null;
}
}
}
);
catch
workflow.task('test',
(context, workContext) => {},
{
catch: (context, workContext, error) => {
switch (error.status) {
case 403: return 'authorize',
default: return null;
}
}
}
);
block
block(name, buildSubWorkflow, options?) -> void
Adds a block task to the workflow. A block task is a wrapper around another workflow that will be executed with a separate and distinct work context. Tthese can be added until the server runs out of resources.
workflow.block('test', w => {
w.task('sub-task', (context, workContext) => {});
w.block('sub-block', subW => {
subW.task('sub-block-task', (context, workContext) => {});
});
});
field | description |
---|---|
name | string -- the name of the task, must be unique |
buildSubWorkflow |
function -- (Workflow) -> void -- will give a Workflow with an identical signature to the current Workflow but will not be directly callable by the WorkflowInstanceManager
|
options | optional, object -- configures the block task execution |
options
field | description |
---|---|
sourceIterator |
optional, function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source. |
parallel | boolean, default - false -- indicates whether the created workflow should run in parallel |
... | see task options for the rest |
iterate
iterate(name, source, action, options?) -> void
Adds an iteration task to the workflow. An iteration task will execute the given action over each item returned by the source. This will be done over an asyncIterator so this can be optimized to be something that's not fully resident in memory but rather a cursor or stream of some sort.
workflow.iterate('test',
/* source */
(context, workContext) => axios({
...,
responseType: 'stream'
}).then(response => response.data),
/* action */
(context, workContext) => {}
);
field | description |
---|---|
name | string -- the name of the task, must be unique |
source |
function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source. |
action |
function -- (context, workContext) -> Promise<Result> -- the action that will be executed for each item returned by the source |
options | optional, object -- configures the iterate task execution |
options
field | description |
---|---|
parallel | boolean, default - false -- indicates whether each item should be executed in parallel. This should be used with caution, if there are too many tasks, it could overload the system. Subsequent versions could limit the max number of tasks executing across all the services |
... | see task options for the rest |
Result
The result object is what augments the work context. Fields in the result object will be applied to not only the root object but to the nested fields. So to assist there are pre-merge operations that can be applied to the fields to further augment the update.
Pre-Merge operations
If you have data that you want to either overwrite or delete on the workcontext, you can do so by using the $overwrite and $delete pre-merge operations. These operations will be preformed on the the workContext before the workItemContext is merged into the workContext and persisted.
###$ $delete operation
workflow.task(someTask, {
post: (context, workContext, result) => ({
...result,
unwantedProperty: { $delete: 1 }
});
});
OR
workflow.task('someTask', {
action: (context, workContext) => {
return {
unwantedProperty: { $delete: 1 }
}
});
});
$overwrite operation
workflow.task(someTask, {
post: (context, workContext, result) => ({
...result,
changingProperty: {
$overwrite: {
newData: 'x'
}
}
};
});
OR
workflow.task('someTask', {
action: (context, workContext) => {
return {
changingProperty: {
$overwrite: {
newData: ['x']
}
}
}
};
});
Commands
Task commands influence what the workflow will do. Inherantly, all next
values are commands to go to the next tasks. Accordingly, commands will adjust what happens next following the execution of a task.
$pause
workflow.task('someTask', {
action: (context, workContext) => ({ test: true })
case: (context, workContext) => {
if (test) return '$pause';
}
});