Nonstop Progressive Marxism
    Share your code. npm Orgs help your team discover, share, and reuse code. Create a free org »

    stepifypublic

    stepify

    Build Status NPM version

    stepify是一个简单易用的Node.js异步流程控制库,提供一种比较灵活的方式完成Node.js(多)任务。

    目标是将复杂的任务进行拆分成多步完成,使得每一步的执行过程更加透明,化繁为简。

    stepify侧重流程控制,甚至可以和其他异步类库配合使用。

    stepify特点

    • 最基本的API的就3个:step()done()run(),简单容易理解。

    • 精细的粒度划分(同时支持单/多任务),执行顺序可定制化。

    • 每一个异步操作都经过特殊的封装,内部只需要关心这个异步的执行过程。

    • 链式(chain)调用,代码逻辑看起来比较清晰。

    • 灵活的回调函数定制和参数传递。

    • 统一处理单个异步操作的异常,也可根据需要单独处理某个任务的异常。

    最简单的用法

    简单实现基于oauth2授权获取用户基本资料的例子:

    // Authorizing based on oauth2 workflow
    Stepify()
        .step('getCode', function(appId, rUri) {
            var root = this;
            request.get('[authorize_uri]', function(err, res, body) {
                root.done(err, JSON.parse(body).code);
            });
        }, [appId], [redirectUri])
        .step('getToken', function(code) {
            var root = this;
            request.post('[token_uri]', function(err, res, body) {
                root.done(err, JSON.parse(body).access_token);
            });
        })
        .step('getInfo', function(token) {
            request.get('[info_uri]?token=' + token, function(err, res, body) {
                // got user info, pass it to client via http response
            });
        })
        .run();

    多个step共用一个handle、静态参数、动态参数传递的例子:

    Stepify()
        .step('read', __filename)
        .step(function(buf) {
            // buf is the buffer content of __filename
            var root = this;
            var writed = 'test.js';
     
            // do more stuff with buf
            // this demo just replace all spaces simply
            buf = buf.toString().replace(/\s+/g, '');
            fs.writeFile(writed, buf, function(err) {
                // writed is the name of target file,
                // it will be passed into next step as the first argument
                root.done(err, writed);
            });
        })
        .step('read')
        // `read` here is a common handle stored in workflow
        .read(function(p, encoding) {
            fs.readFile(p, encoding || null, this.done.bind(this));
        })
        .run();

    这里多了一个read()方法,但read方法并不是stepify内置的方法。实际上,您可以任意“扩展”stepify链!它的奥妙在于step()方法的参数,详细请看step调用说明

    可以看到,一个复杂的异步操作,通过stepify定制,每一步都是那么清晰可读!

    安装

    $ npm install stepify

    运行测试

    $ npm install
    $ mocha

    灵活使用

    var Stepify = require('stepify');
    var workflow1 = Stepify().step(fn).step(fn)...run();
    // or
    var workflow2 = new Stepify().step(fn).step(fn)...run();
    // or
    var workflow3 = Stepify().step(fn).step(fn);
    // do some stuff ...
    workflow3.run();
    // or
    var workflow4 = Stepify().task('foo').step(fn).step(fn).task('bar').step(fn).step(fn);
    // do some stuff ...
    workflow4.run(['foo', 'bar']);
    var workflow5 = Stepify().step(fn).step(fn);
    workflow5.debug = true;
    workflow5.error = function(err) {};
    workflow5.result = function(result) {};
    ...
    workflow5.run();
    // more ...

    注:文档几乎所有的例子都是采用链式调用,但是拆开执行也是没有问题的。

    原理

    概念:

    • task:完成一件复杂的事情,可以把它拆分成一系列任务,这些个任务有可能它的执行需要依赖上一个任务的完成结果,它执行的同时也有可能可以和其他一些任务并行,串行并行相结合,这其实跟真实世界是很吻合的。

    • step:每一个task里边可再细分,可以理解成“一步一步完成一个任务(Finish a task step by step)”,正所谓“一步一个脚印”是也。

    stepify内部实际上有两个主要的类,一个是Stepify,一个是Step。

    Stepify()的调用会返回一个Stepify实例,在这里称之为workflow,用于调度所有task的执行。

    step()的调用会创建一个Step实例,用于完成具体的异步操作(当然也可以是同步操作,不过意义不大),step之间使用简单的api(done方法和next方法)传递。

    API 文档

    Stepify类:

    调用Stepify即可创建一个workflow。

    Step类:

    Step类只在Stepify实例调用step方法时创建,不必显式调用。


    debug()

    描述:开启debug模式,打印一些log,方便开发。

    调用:debug(flag)

    参数:

    • {Boolean} flag 默认是false

    例子:

    var work = Stepify().debug(true);
    // or
    var work = Stepify();
    work.debug = true;

    task()

    描述:显式创建一个task,task()的调用是可选的。在新定制一个task时,如果没有显式调用task(),则这个task的第一个step()内部会先生成一个task,后续的step都是挂在这个task上面,每一个task内部会维持自己的step队列。多个task使用pend方法分割。

    调用:task([taskName])

    参数:

    • {String} taskName 可选参数,默认是_UNAMED_TASK_[index]。为这个task分配一个名字,如果有多个task实例并且执行顺序需要(使用run()方法)自定义,则设置下taskName方便一点。

    例子:

    var myWork1 = Stepify().task('foo').step('step1').step('step2').run();
    // equal to
    var myWork1 = Stepify().step('step1').step('step2').run();
    // multiply tasks
    var myWork2 = Stepify()
        .task('foo')
         .step(fn)
            .step(fn)
        .task('bar')
         .step(fn)
            .step(fn)
        .task('baz')
         .step(fn)
            .step(fn)
        .run();

    step()

    描述:定义当前task的一个异步操作,每一次step()调用都会实例化一个Step推入task的step队列。这个方法是整个lib的核心所在。

    调用:step(stepName, stepHandle, *args)

    参数:

    • {String} stepName 可选参数,但在不传stepHandle时是必传参数。为这个step分配一个名称。当stepHandle没有传入时,会在Stepify原型上扩展一个以stepName命名的方法,而它具体的实现则在调用stepName方法时决定,这个方法详情请看stepName说明

    • {Function} stepHandle 可选参数,但在stepName不传时是必传参数。在里边具体定义一个异步操作的过程。stepHandle的执行分两步,先查找这个step所属的task上有没有stepHandle,找不到则查找Stepify实例上有没有stepHandle,再没有就抛异常。

    • {Mix} *args 可选参数,表示这个step的已知参数(即静态参数),在stepHandle执行的时候会把静态参与动态参数(通过done或者next传入)合并作为stepHandle的最终参数。

    例子:

    • 参数传递
    Stepify()
        .step(function() {
         var root = this;
            setTimeout(function() {
             // 这里done的第二个参数(100)即成为下一个stepHandle的动态参数
             root.done(null, 100);
            }, 100);
        })
        .step(function(start, n) {
         // start === 50
            // n === 100
            var root = this;
            setTimeout(function() {
             root.done();
            }, start + n);
        }, 50)
        .run();
    • 扩展原型链
    Stepify()
        .step('sleep')
        // more step ...
        .step('sleep', 50)
        .sleep(function(start, n) {
            var args = [].slice.call(arguments, 0);
            var root = this;
     
            n = args.length ? args.reduce(function(mem, arg) {return mem + arg;}) : 100;
            setTimeout(function() {
                root.done(null, n);
            }, n);
        })
        .run();

    pend()

    描述:结束一个task的定义,会影响扩展到Stepify原型链上的stepName方法的执行。

    调用:pend()

    参数: 无参数。

    例子:见*stepName*部分

    stepName()

    描述:这是一个虚拟方法,它是通过动态扩展Stepify类原型链实现的,具体调用的名称由step方法的stepName参数决定。扩展原型链的stepName的必要条件是step方法传了stepName(stepName需要是一个可以通过.访问属性的js变量)但是stepHandle没有传,且stepName在原型链上没定义过,当workflow执行结束之后会删除已经扩展到原型链上的所有方法。当调用实例上的stepName方法时,会检测此时有没有在定义的task(使用pend方法结束一个task的定义),如果有则把传入的handle挂到到这个task的handles池里,没有则挂到Stepify的handles池。

    调用:stepName(stepHandle)

    参数:

    • {Function} stepHandle 必传参数,定义stepName对应的stepHandle,可以在多个task之间共享。

    例子:

    • pend()的影响
    Stepify()
        .step('mkdir', './foo')
        // 这样定义,在执行到sleep时会抛异常,
        // 因为这个task上面没定义过sleep的具体操作
        .step('sleep', 100)
        .pend()
        .step('sleep', 200)
        .step('mkdir', './bar')
        .sleep(function(n) {
            var root = this;
            setTimeout(function() {
                root.done();
            }, n);
        })
        // 这个pend的调用,使得mkdir方法传入的handle挂在了Stepify handles池中,
        // 所以第一个task调用mkdir方法不会抛异常
        .pend()
        .mkdir(function(p) {
            fs.mkdir(p, this.done.bind(this));
        })
        .run();
    • stepHandle的查找
    Stepify()
        .step('mkdir', './foo')
        // 定义当前task上的mkdirHandle,这里其实直接.step('mkdir', fn)更清晰
        .mkdir(function(p) {
            fs.mkdir(p, 0755, this.done.bind(this));
        })
        .step('sleep', 100)
        .pend()
        // 这个task上没定义mkdirHandle,会往Stepify类的handles池去找
        .step('mkdir', './bar')
        .step('sleep', 200)
        .pend()
        .sleep(function(n) {
            var root = this;
            setTimeout(function() {
                root.done();
            }, n);
        })
        .mkdir(function(p) {
            fs.mkdir(p, this.done.bind(this));
        })
        .run();

    error()

    描述:定制task的异常处理函数。

    调用:error(errorHandle)

    参数:

    • {Function} errorHandle 必传参数 默认会直接抛出异常并中断当前task的执行。每一个task都可以定制自己的errorHandle,亦可为所有task定制errorHandle。每个step执行如果出错会直接进入这个errorHandle,后面是否继续执行取决于errorHandle内部定义。errorHandle第一个参数便是具体异常信息。

    注意:errorHandle的执行环境是发生异常所在的那个step,也就是说Step类定义的所有方法在errorHandle内部均可用,您可以在异常时决定是否继续执行下一步,或者使用this.taskNamethis.name分别访问所属task的名称和step的名称,进而得到更详细的异常信息。

    例子:

    Stepify()
        .step(fn)
        .step(fn)
        // 这个task的异常会走到这里
        .error(function(err) {
            console.error('Error occurs when running task %s\'s %s step!', this.taskName, this.name);
            if(err.message.match(/can_ignore/)) {
                // 继续执行下一步
                this.next();
            } else {
                throw err;
            }
        })
        .pend()
        .step(fn)
        .step(fn)
        .pend()
        // 所有没显式定义errorHandle的所有task异常都会走到这里
        .error(function(err) {
            console.error(err.stack);
            res.send(500, 'Server error!');
        })
        .run();

    result()

    描述:所有task执行完之后,输出结果。在Stepify内部,会保存一份结果数组,通过step的fulfill方法可以将结果push到这个数组里,result执行的时候将这个数组传入finishHandle。

    调用:result(finishHandle)

    参数:

    • {Function} finishHandle,result本身是可选调用的,如果调用了result,则finishHandle是必传参数。

    例子:

    Stepify()
        .step(function() {
            var root = this;
            setTimeout(function() {
                root.fulfill(100);
                root.done(null);
            }, 100);
        })
        .step(function() {
            var root = this;
            fs.readFile(__filename, function(err, buf) {
                if(err) return root.done(err);
                root.fulfill(buf.toString());
                root.done();
            });
        })
        .result(function(r) {
            console.log(r); // [100, fs.readFileSync(__filename).toString()]
        })
        .run();

    run()

    描述:开始执行所定制的整个workflow。这里比较灵活,执行顺序可自行定制,甚至可以定义一个workflow,分多种模式执行。

    调用:run(*args)

    参数:

    • {Mix} 可选参数,类型可以是字符串(taskName)、数字(task定义的顺序,从0开始)、数组(指定哪些tasks可以并行),也可以混合起来使用(数组不支持嵌套)。默认是按照定义的顺序串行执行所有tasks。

    例子:

    function createTask() {
        return Stepify()
            .task('task1')
                .step(function() {
                    c1++;
                    fs.readdir(__dirname, this.wrap());
                })
                .step('sleep')
                .step('exec', 'cat', __filename)
            .task('task2')
                .step('sleep')
                .step(function() {
                    c2++;
                    var root = this;
                    setTimeout(function() {
                        root.done(null);
                    }, 1500);
                })
                .step('exec', 'ls', '-l')
            .task('task3')
                .step('readFile', __filename)
                .step('timer', function() {
                    c3++;
                    var root = this;
                    setTimeout(function() {
                        root.done();
                    }, 1000);
                })
                .step('sleep')
                .readFile(function(p) {
                    fs.readFile(p, this.done.bind(this));
                })
            .task('task4')
                .step('sleep')
                .step(function(p) {
                    c4++;
                    fs.readFile(p, this.wrap());
                }, __filename)
            .pend()
            .sleep(function() {
                console.log('Task %s sleep.', this.taskName);
                var root = this;
                setTimeout(function() {
                    root.done(null);
                }, 2000);
            })
            .exec(function(cmd, args) {
                cmd = [].slice.call(arguments, 0);
                var root = this;
                exec(cmd.join(' '), this.wrap());
            });
    };
     
    var modes = {
        'Default(serial)': [], // 10621 ms.
        'Customized-serial': ['task1', 'task3', 'task4', 'task2'], // 10624 ms.
        'Serial-mix-parallel-1': ['task1', ['task3', 'task4'], 'task2'], // 8622 ms.
        'Serial-mix-parallel-2': [['task1', 'task3', 'task4'], 'task2'], // 6570 ms.
        'Serial-mix-parallel-3': [['task1', 'task3'], ['task4', 'task2']], // 6576 ms.
        'All-parallel': [['task1', 'task3', 'task4', 'task2']], // 3552 ms.
        'Part-of': ['task2', 'task4'] // 5526 ms.
    };
     
    var test = function() {
        var t = Date.now();
        var task;
     
        Object.keys(modes).forEach(function(mode) {
            task = createTask();
     
            task.result = function() {
                console.log(mode + ' mode finished and took %d ms.', Date.now() - t);
            };
     
            task.run.apply(task, modes[mode]);
        });
     
        setTimeout(function() {
            log(c1, c2, c3 ,c4); // [6, 7, 6, 7]
        }, 15000);
    };
     
    test();

    done()

    描述:标识完成了一个异步操作(step)。

    调用:done([err, callback, *args])

    参数:

    • {String|Error|null} err 错误描述或Error对象实例。参数遵循Node.js的回调约定,可以不传参数,如果需要传递参数,则第一个参数必须是error对象。

    • {Function} callback 可选参数 自定义回调函数,默认是next,即执行下一步。

    • {Mix} *args 这个参数是传递给callback的参数,也就是作为下一步的动态参数。一般来说是将这一步的执行结果传递给下一步。

    例子:

    Stepify()
        .step(function() {
            var root = this;
            setTimeout(function() {
                root.done();
            }, 200);
        })
        .step(function() {
            var root = this;
            exec('curl "https://github.com/"', function(err, res) {
                // end this task in error occured
                if(err) root.end();
                else root.done(null, res);
            });
        })
        .step(function(res) {
            var root = this;
            setTimeout(function() {
                // do some stuff with res ...
                console.log(res);
                root.done();
            }, 100);
        })
        .run();

    wrap()

    描述:其实就是this.done.bind(this)的简写,包装done函数保证它的执行环境是当前step。比如原生的fs.readFile()的callback的执行环境被设置为nullfs.js#L91

    调用:wrap()

    参数:无

    例子:

    Stepify()
        .step(function() {
            fs.readdir(__dirname, this.done.bind(this));
        })
        .step(function() {
            fs.readFile(__filename, this.wrap());
        })
        .run();

    fulfill()

    描述:把step执行的结果推入结果队列,最终传入finishHandle。最终结果数组的元素顺序在传入给finishHandle时不做任何修改。

    调用:fulfill(*args)

    参数:

    • {Mix} 可选参数 可以是一个或者多个参数,会一一push到结果队列。

    例子:

    // Assuming retrieving user info
    Stepify()
        .step(function() {
            var root = this;
            db.getBasic(function(err, basic) {
                root.fulfill(basic || null);
                root.done(err, basic.id);
            });
        })
        .step(function(id) {
            var root = this;
            db.getDetail(id, function(err, detail) {
                root.fulfill(detail || null);
                root.done(err);
            });
        })
        .error(function(err) {
            console.error(err);
            res.send(500, 'Get user info error.');
        })
        .result(function(r) {
            res.render('user', {basic: r[0], detail: r[1]});
        })
        .run();

    vars()

    描述:暂存临时变量,在整个workflow的运行期可用。如果不想在workflow之外使用var申明别的变量,可以考虑用vars()。

    调用:vars(key[, value])

    参数:

    • {String} key 变量名。访问临时变量。

    • {Mix} value 变量值。如果只传入key则是访问变量,如果传入两个值则是写入变量并返回这个value。

    例子:

    Stepify()
        .step(function() {
            this.vars('foo', 'bar');
            // todo
        })
        .pend()
        .step(function() {
            // todo
            console.log(this.vars('foo')); // bar
        })
        .run();

    parallel()

    描述:简单的并发支持。这里还可以考虑引用其他模块(如:async)完成并行任务。

    调用:parallel(arr[, iterator, *args, callback])

    参数:

    • {Array} arr 必传参数。需要并行执行的一个数组,对于数组元素只有一个要求,就是如果有函数则所有元素都必须是一个函数。

    • {Function} iterator 如果arr参数是一个函数数组,这个参数是不用传的,否则是必传参数,它迭代运行arr的每一个元素。iterator的第一个参数是arr中的某一个元素,第二个参数是回调函数(callback),当异步执行完之后需要调用callback(err, data)

    • {Mix} *args 传递给iterator的参数,在迭代器执行的时候,arr数组的每一个元素作为iterator的第一个参数,*args则作为剩下的传入。

    • {Function} callback 可选参数(约定当最后一个参数是函数时认为它是回调函数) 默认是next。这个并行任务的执行结果会作为一个数组按arr中定义的顺序传入callback,如果执行遇到错误,则直接交给errHandle处理。

    例子:

    传入一个非函数数组(parallel(arr, iterator[, *arg, callback]))

    Stepify()
        .step(function() {
            fs.readdir(path.resolve('./test'), this.wrap());
        })
        .step(function(list) {
            list = list.filter(function(p) {return path.extname(p).match('js');});
            list.forEach(function(file, i) {list[i] = path.resolve('./test', file);});
            // 注释部分就相当于默认的this.next
            this.parallel(list, fs.readFile, {encoding: 'utf8'}/*, function(bufArr) {this.next(bufArr);}*/);
        })
        .step(function(bufArr) {
            // fs.writeFile('./combiled.js', Buffer.concat(bufArr), this.done.bind(this));
            // or
            this.parallel(bufArr, fs.writeFile.bind(this, './combiled.js'));
        })
        .run();

    传入函数数组(parallel(fnArr[]))

    Stepify()
        .step(function() {
            this.parallel([
                function(callback) {
                    fs.readFile(__filename, callback);
                },
                function(callback) {
                    setTimeout(function() {
                        callback(null, 'some string...');
                    }, 500);
                }
            ]);
        })
        .step(function(list) {
            console.log(list); // [fs.readFileSync(__filename), 'some string...']
            // todo...
        })
        .run();

    下面是一个应用到某项目里的例子:

    ...
    .step('fetch-images', function() {
        var root = this;
        var localQ = [];
        var remoteQ = [];
     
        // do dome stuff to get localQ and remoteQ
     
        this.parallel([
            function(callback) {
                root.parallel(localQ, function(frameData, cb) {
                    // ...
                    db.getContentType(frameData.fileName, function(type) {
                        var imgPath = frameData.fileName + '.' + type;
                        // ...
                        db.fetchByFileName(imgPath).pipe(fs.createWriteStream(targetUrl));
                        cb(null);
                    });
                }, function(r) {callback(null, r);});
            },
            function(callback) {
                root.parallel(remoteQ, function(frameData, cb) {
                    var prop = frames[frameData['frame']].children[frameData['elem']]['property'];
                    // ...
                    request({url: prop.src}, function(err, res, body) {
                        // ...
                        cb(null);
                    }).pipe(fs.createWriteStream(targetUrl));
                }, function(r) {callback(null, r);});
            },
        ]);
    })
    ...

    jump()

    描述:在step之间跳转。这样会打乱step的执行顺序,谨慎使用jump,以免导致死循环

    调用:jump(index|stepName)

    参数:

    • {Number} index 要跳转的step索引。在step创建的时候会自建一个索引属性,使用this._index可以访问它。

    • {String} stepName step创建时传入的名称。

    例子:

    Stepify()
        .step('a', fn)
        .step('b', fn)
        .step(function() {
            if(!this.vars('flag')) {
                this.jump('a');
                this.vars('flag', 1)
            } else {
                this.next();
            }
     
            // 其他的异步操作
        })
        .step('c', fn)
        .run();

    next()

    描述:显式调用下一个step,并将数据传给下一step(即下一个step的动态参数)。其实等同于done(null, *args)。

    调用:next([*args])

    参数:

    • {Mix} *args 可选参数 类型不限,数量不限。

    例子:

    Stepify()
        .step(function() {
            // do some stuff ...
            this.next('foo', 'bar');
        })
        .step(function(a, b, c) {
            a.should.equal('test');
            b.should.equal('foo');
            c.should.equal('bar');
        }, 'test')
        .run();

    end()

    描述:终止当前task的执行。如果遇到异常并传递给end,则直接交给errorHandle,和done一样。不传或者传null则跳出所在task执行下一个task,没有则走到result,没有定义result则退出进程。

    调用:end(err)

    参数:

    • {Error|null} err 可选参数, 默认null。

    例子:

    Stepify()
        .step(fn)
        .step(function() {
            if(Math.random() > 0.5) {
                this.end();
            } else {
                // todo ...
            }
        })
        .step(fn)
        .run();

    最后,欢迎fork或者提交bug

    License

    MIT http://rem.mit-license.org

    install

    npm i stepify

    Downloadsweekly downloads

    8

    version

    0.1.5

    license

    MIT

    repository

    githubgithub

    last publish

    collaborators

    • avatar