exframe-workflow

5.5.1 • Public • Published

exframe Workflow

A library for facilitating a distributed workflow as a service system

Features and Assumptions

  • RabbitMQ driven workflow engine over durable queues
  • Managed state tracked in MongoDB
  • Distributed execution of tasks
  • Full worklog tracking
  • Blocks or sub-workflows
  • Fully restartable at any point

Available Topologies

  • Workflows

Usage

const logger = require('exframe-logger').create();
const dbClient = require('exframe-db').init(options);
const cacheManager = require('exframe-cache-manager').create(options);
const { WorkflowManager } = require('exframe-workflow');

dbClient.connect();

const serviceName = 'test-service';
const workflowManager = WorkflowManager.default(logger, serviceName, cacheManager, dbClient);

function addOne(context, { inputValue }) { return { result: inputValue + 1 }; }

const myWorkflow = WorkflowManager.createWorkflow('my-workflow');
myWorkflow.task(addOne);

const context = {
  user: { userId: 'test-id', userName: 'test-user' },
  log: logger
};

app.post('/addOne', (request, response) => {
  const instance = await myWorkflow.start(context, { inputValue: response.body.inputValue });
  response.send(await instance.getWorkContext());
});

Summary

The WorkflowManager handles the instantiation of the WorkflowInstanceManager and the Workflow. The workflow is bound to the Workflow is bound to the WorkflowInstanceManager and when a workflow is started or resumed it will coordinate the WorkflowInstanceManager to create or load a WorkflowInstance. A WorkflowInstance will manage traversing through the tasks, joining on child workflow results, and keeping work context state.

WorkflowManager

Handles the instantiation of the WorkflowInstanceManager and the Workflow. It has been configured with a default implementation that wraps the exframe-cache-manager and exframe-db for logging, caching, and coordination. WorkflowInstanceManager will listen on a queue that's unique by the serviceName. The distributed coordination for starting workflows across multiple instances will rely on this.

static default(logger, serviceName, cacheManager, dbClient, mqClient, options?) -> WorkflowManager

field description
logger exframe-logger
serviceName string -- the unique name for the service to use for coordinating the start of new workflow instances
cacheManager exframe-cache-manager -- provides the redis backing for the WorkflowInstance cache and the expiry queue for task execution expiration
dbClient exframe-db -- provides the mongo backing for the WorkflowInstance cache and the WorkLog storage
options optional, object -- configures the WorkflowManager

options

field description
serverId string -- a unique identifier for the server running the service, can help with logging
timeoutDuration integer, default - 30 -- the number of seconds a service will wait between trying to pull items off the queue
maxWorkers integer, default - 30 -- the number of workers for the worker pool, this governs the number of concurrent workflows that can execute at any given time
overflowWorkers integer, default - 15 -- the number of workers over the max number of workers that can execute, this is a bucket reserved for incoming requests or parent workflows that are resuming after all children have completed
workTtl integer, default - 600 -- the number of seconds that a workflow execution is locked from being taken over by another service
keepaliveInterval integer, default - 60 -- the number of seconds between refreshing the locks

create(name, options?) -> Workflow

field description
name string -- specifies the name of the workflow, generally for logging purposes
options? optional, object -- configures the workflow execution

options

field description
parallel boolean -- specifies whether the workflow will execute sequentially through the tasks or in parallel essentially vertically or horizontally

Workflow

task

task(name|action, action|options, options?) -> void

Add a task to the workflow. For shortcut purposes, action can be a named function and used in lieu of entering the string name

workflow.task('test', (context, workContext) => {});
workflow.task(function test(context, workContext) {});
field description
name string -- the name of the task, must be unique
action function -- (context, workContext, { instanceId, index }) -> Promise<Result> -- the action that will be executed when the task is executed by the WorkflowInstance. The instanceId is the id for the workflow and index is the item index from a sourceIterator (if there is one). The result will be merged with the workContext. See Result
options optional, object -- configures the task execution

options

field description
next optional, string -- the name of the next task to execute. If not set, next will automatically be set by whatever the subsequently added task is. If null, next will not be automatically set. If the value is a task command that command will be operated on. See Commands
pre optional, function -- (context, workContext) -> Promise<Result> -- a function that can mutate the context to be given to the action, this is mostly useful for blocks
post optional, function -- (context, workContext, result) -> Promise<Result> -- a function that can process the result relative to the work context and can further modify the result before merge into the work context, this is mostly useful for blocks
case optional, function -- (context, workContext) -> Promise<string> -- a function that can return the name of a task that should be executed next based upon the current work context following the execution of the task. If the value is a task command that command will be operated on. See Commands
catch optional, function -- (context, workContext, error) -> Promise<string> -- a function that can return the name of a task that should be executed next given the error that was thrown by the task. If the value is a task command that command will be operated on. See Commands

pre

workflow.task('test',
  (context, preWorkContext) => {}, {
  pre: (context, workContext) => {
    const preWorkContext = workContext;
    return preWorkContext;
  }
});

post

workflow.task('test',
  (context, workContext) => {
    const result = workContext;
    return result;
  }, {
  post: (context, workContext, result) => {
    const postResult = result;
    return postResult;
  }
});

case

workflow.task('test',
  (context, workContext) => {}, 
  {
    case: (context, workContext) => {
      switch (workContext.value) {
        case 'test': return 'test',
        default: return null;
      }
    }
  }
);

catch

workflow.task('test',
  (context, workContext) => {}, 
  {
    catch: (context, workContext, error) => {
      switch (error.status) {
        case 403: return 'authorize',
        default: return null;
      }
    }
  }
);

block

block(name, buildSubWorkflow, options?) -> void

Adds a block task to the workflow. A block task is a wrapper around another workflow that will be executed with a separate and distinct work context. Tthese can be added until the server runs out of resources.

workflow.block('test', w => {
  w.task('sub-task', (context, workContext) => {});
  w.block('sub-block', subW => {
    subW.task('sub-block-task', (context, workContext) => {});
  });
});
field description
name string -- the name of the task, must be unique
buildSubWorkflow function -- (Workflow) -> void -- will give a Workflow with an identical signature to the current Workflow but will not be directly callable by the WorkflowInstanceManager
options optional, object -- configures the block task execution

options

field description
sourceIterator optional, function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source.
parallel boolean, default - false -- indicates whether the created workflow should run in parallel
... see task options for the rest

iterate

iterate(name, source, action, options?) -> void

Adds an iteration task to the workflow. An iteration task will execute the given action over each item returned by the source. This will be done over an asyncIterator so this can be optimized to be something that's not fully resident in memory but rather a cursor or stream of some sort.

workflow.iterate('test',
  /* source */
  (context, workContext) => axios({
    ...,
    responseType: 'stream'
  }).then(response => response.data),
  /* action */
  (context, workContext) => {}
);
field description
name string -- the name of the task, must be unique
source function -- (context, workContext) -> asyncIterator -- the source for all the items to apply the action to. This can be an array but it would be best if it's some sort of async iterator. Even a stream can be used as an async iterator source. So, theoretically, and endpoint that can be parseable in a streaming fashion could be used as a source.
action function -- (context, workContext) -> Promise<Result> -- the action that will be executed for each item returned by the source
options optional, object -- configures the iterate task execution

options

field description
parallel boolean, default - false -- indicates whether each item should be executed in parallel. This should be used with caution, if there are too many tasks, it could overload the system. Subsequent versions could limit the max number of tasks executing across all the services
... see task options for the rest

Result

The result object is what augments the work context. Fields in the result object will be applied to not only the root object but to the nested fields. So to assist there are pre-merge operations that can be applied to the fields to further augment the update.

Pre-Merge operations

If you have data that you want to either overwrite or delete on the workcontext, you can do so by using the $overwrite and $delete pre-merge operations. These operations will be preformed on the the workContext before the workItemContext is merged into the workContext and persisted.

###$ $delete operation

workflow.task(someTask, {
  post: (context, workContext, result) => ({
    ...result,
    unwantedProperty: { $delete: 1 }
  });
});

OR

workflow.task('someTask', {
  action: (context, workContext) => {
    return {
      unwantedProperty: { $delete: 1 }
    }
  });
});

$overwrite operation

workflow.task(someTask, {
  post: (context, workContext, result) => ({
    ...result,
    changingProperty: {
      $overwrite: {
        newData: 'x'
      }
    }
  };
});

OR

workflow.task('someTask', {
  action: (context, workContext) => {
    return {
      changingProperty: {
        $overwrite: {
          newData: ['x']
        }
      }
    }
  };
});

Commands

Task commands influence what the workflow will do. Inherantly, all next values are commands to go to the next tasks. Accordingly, commands will adjust what happens next following the execution of a task.

$pause

workflow.task('someTask', {
  action: (context, workContext) => ({ test: true })
  case: (context, workContext) => {
    if (test) return '$pause';
  }
});

Readme

Keywords

none

Package Sidebar

Install

npm i exframe-workflow

Weekly Downloads

100

Version

5.5.1

License

ISC

Unpacked Size

309 kB

Total Files

77

Last publish

Collaborators

  • exzeodevops