Neoclassical Philosophic Musings

    redis-workflow
    TypeScript icon, indicating that this package has built-in type declarations

    0.5.1 • Public • Published

    Redis Workflow

    Dynamic rules engine to allow configurable workflow in app without requiring code changes. Using Redis as backing service, you can design workflows and whenever you start your workflow, it will load your stored workflow(s) then attach a pubsub listener to Redis. Any time your pubsub channel message appears, it will parse the event object, perform conditional logic, and if true emit one or more defined actions.

    This app is loosely-based on popular enterprise systems workflow rules or business process features.

    • Trigger (or event) - something happened
    • Conditions (or filters) - criteria to take action
    • Actions (or tasks) - what to do

    Requirements

    You must have Redis server running. This app treats redis like a persistent backup. It uses pubsub as a listener for trigger events. It uses it to save and load workflows to avoid loss. The set and get workflows methods just interact with memory and not database. See the IWorkflowManager interface source for more.

    Extensibility

    Most non-public properties and methods are protected so you may sub-class and override. RedisWorkflowManager is my preferred implementation but you could also write a new manager class that implements IWorkflowManager interface.

    Installation

    npm install redis-workflow
    yarn add redis-workflow

    Test

    The test script in package.json preprocesses the .ts file and then executes.

    npm test (also can run npm run coverage)

    Usage

    The source was written in Typescript, yet it compiles to Javascript (npm run build). You can use in ES5 or later supported environments. The following code snippets are implemented in the __tests__ folder.

    Quick start (Node)

    const redis = require("redis");
    const flow = require("redis-workflow");
     
    // instantiate
    const config = flow.RedisConfig("localhost", 6379, null, null);
    const client = redis.createClient(); // optionally pass into manager 2nd param to share
    const manager = new flow.RedisWorkflowManager(config);
     
    // create first workflow
    const trigger = new flow.Trigger("myTrigger");
    const rule = new flow.Rule("myRule", "foo == bar");
    const action = new flow.ImmediateAction("myAction");
    const workflow = new flow.Workflow("myWorkflow", trigger, [rule], [action]);
     
    // add workflow to manager
    manager.setWorkflows({"myChannel": [workflow]});
     
    // add listener for action
    manager.on("myAction", (action) => {
        // perform something here
        console.log({action}); // should see after pubsub event below
    });
     
    // add error handler
    manager.on("error", (error) => {
        // handle errors here
        console.error(`An error occurred: `, error);
    })
     
    // start workflow engine
    manager.start("myChannel");
     
    // publish a test object to the pubsub channel
    setTimeout(() => {
        const event = {event: "myTrigger", context: {foo: "bar"}};
        client.publish("myChannel", JSON.stringify(event), (err, reply) => {
            console.log({err, reply});
        });
    }, 2500);
     
    // sometime later ...
    setTimeout(() => {
        manager.stop("myChannel"); // unsubscribe from pubsub and stop emitting actions
    }, 5000);
     
    // note, if you publish an event and conditions do not == true, no action taken

    Typescript

    See more detailed examples in the respective src/__tests__ and src/lib/__tests__ folders.

    Example with multiple rules and actions for a workflow

    import * as redis from "redis";
    import * as flow from "redis-workflow"; // optionally import each class (see __tests__)
     
    const config: flow.RedisConfig = new flow.RedisConfig("localhost", 6379, null, null);
    const manager: flow.IWorkflowManager = new flow.RedisWorkflowManager(config);
    const publisher: redis.RedisClient = new redis.createClient(); // just for our example to pubsub message
     
    // build test workflow
    const trigger: flow.ITrigger = new flow.Trigger("test.trigger101");
    const rule1: flow.IRule = new flow.Rule("Foo should equal bar", `foo == "bar"`);
    const rule2: flow.IRule = new flow.Rule("Should be in stock", "inStock > 0");
    const action1: flow.IAction = new flow.DelayedAction("shipProduct").delay(5, "days"); // context added later when triggered
    const action2: flow.IAction = new flow.ImmediateAction("adjustInventory");
    const workflow: flow.IWorkflow = new flow.Workflow("test.workflow1", trigger, [rule1, rule2], [action1, action2]);
     
    // add first workflow to manager
    manager.setWorkflows({"myChannel": [workflow]});
     
    // errors
    manager.on(WorkflowEvents.Error, (error) => {
        console.error(`Something bad happened: `, error);
    })
     
    // delayed actions
    manager.on(WorkflowEvents.Schedule, (action) => {
        // handle delayed actions how you like (use your own scheduler)
        console.log(`Delayed action received!`);
     
        switch (action.getName()) {
            case "shipProduct":
                // schedule with fulfillment
                break;
            default:
                // add to cron
                break;
        }
    });
     
    // immediate actions
    manager.on(WorkflowEvents.Immediate, (action) => {
        // handle immediate actions
        console.log(`Immediate action received!`);
     
        switch (action.getName()) {
            case "adjustInventory":
                // task inventory management
                break;
            default:
                // global handler or notification
                break;
        }
    });
     
    // optionally handle actions by name
    manager.on("adjustInventory", (action) => {
        // do something here
        const item: string = action.getContext().foo;
        const inStock: number = action.getContext().inStock;
        console.log(`Adjusting inventory for '${item}' from ${inStock} to ${inStock - 1}`);
    });
     
    // optionally handle global actions (all types)
    manager.on(WorkflowEvents.Audit, (action) => {
        // publish action to stream pipeline
    });
     
    // optionally handle actions that didn't meet rules
    manager.on(WorkflowEvents.Invalid, (message) => {
        // do something
        switch (message.event) {
            case "newOrder":
                // notify cust svc to try and complete order
                break;
            default:
                // log somewhere
                break;
        }
    });
     
    // start manager (subscribes to pubsub channel)
    manager.start("babyDivision")
        .then(() => {
            // do something if you like
        });
     
    // build and publish trigger events to Redis pubsub channel
    const message: {[key: string]: any} = {
        context: {
            foo: "bar",
            inStock: 3,
        },
        event: "test.trigger101",
    };
     
    // simulate time after manager starts before triggers appear
    setTimeout(() => {
        publisher.publish("babyDivision", JSON.stringify(message), (err: Error, reply: any) => {
            // simulate time later shutting down workflow channel
            setTimeout(() => {
                manager.stop("babyDivision");
            }, 3000);
        });
    }, 3000);

    Triggers

    Uses Redis pubsub listening for events to start workflow.

    const trigger: ITrigger = new Trigger("test");

    Payload

    The message published to the topic will include a stringified JSON object as follows.

    {
        "event": "test", // must equal Trigger name
        "context": {"myField": "myValue", "anotherField": "anotherValue"}
    }

    Conditions

    Uses mozjexl Javascript expression language to evaluate string expressions, evaluating to true or false.

    const rule: IRule = new Rule("Field must be valid", `myField == "myValue"`);

    Actions

    You define actions when building workflows. The action name will become an EventEmitter event you handle.

    // optionally named
    manager.on("eventName", (action) => {
        // handle action here
    });
     
    manager.on(flow.WorkflowEvents.Schedule, (action) => {
        // handle action here
    });
     
    manager.on(flow.WorkflowEvents.Immediate, (action) => {
        // handle action here
    });

    For each Action you add to your workflow, you one or more listeners like above. You can decide what functionality your application performs if conditions are met, and actions are emitted.

    Suggested action types

    • Create record(s)
    • Update record(s)
    • Trigger another workflow
    • Send message(s) or notification(s)

    Scaling

    This implementation using pubsub can scale by leveraging different channels per instance, with a fanout. If you want workers, I would override the start method to use Redis blrpoplpush (blocking pop of list) and publish trigger events to it instead. This way you can spin up unlimited workers and once one pops from list, others ignore.

    Resilience

    If you instantiate the manager and pass in the third optional argument channels: string[], the app will attempt to load workflows from the database.

    Contributing

    I haven't thought that far ahead yet. I needed this for my project and wanted to give back. ;-)

    License

    MIT (if you enhance it, fork and PR so the community benefits)

    Install

    npm i redis-workflow

    DownloadsWeekly Downloads

    3

    Version

    0.5.1

    License

    MIT

    Unpacked Size

    144 kB

    Total Files

    58

    Last publish

    Collaborators

    • mikesparr