资讯专栏INFORMATION COLUMN

async源码阅读笔记

Kaede / 2299人阅读

摘要:调度器有了迭代器,还需要一个调度器才能按照预期的流程串行执行需要的函数,同时处理参数传递的过程我自己写的代码,调度的工作是由一起做的。

本文摘自我的博客,欢迎大家去逛逛。

又是两周没写博客了,圣诞夜来水一发~
今天稍微看了下async的源码,感觉很简短精炼,总共也才1000多行代码,好多值得学习的地方。主要看的是waterfall模块,由于源码中有好多不同接口公用的部分,因此看完waterfall这个接口的整个流程,差不多就cover了一半的async源码了。

造轮子

在没有太多使用经验的情况下,直接看源码,可能会遇到一些不明所以的细节,看了可能也只能吸收很少的一部分。最好的方式我觉得莫过于自己先造一遍轮子,再看源码了。

接口需求

waterfall这个接口的命名还是很形象的

我要定义一个waterfall函数,满足以下需求:

可以按照Array给定的顺序逐个执行

所有函数执行完毕后,调用指定的回调函数

前一个函数的输出作为后一个函数的输入

中途某一个函数执行失败,直接调用回调函数结束

需求的代码描述如下:

async.waterfall([
    function(callback) {
        callback(null, "one", "two");
    },
    function(arg1, arg2, callback) {
        console.log(arg1);
        console.log(arg2);
          // arg1 now equals "one" and arg2 now equals "two"
        callback(null, "three");
    },
    function(arg1, callback) {
        console.log(arg1);
        // arg1 now equals "three"
        callback(null, "done");
    }
], function (err, result) {
    // result now equals "done"
    console.log(result);
});

// 期望输出:
// one
// two
// three
// done
编码

代码组织了好一会儿,又调试了好一会后(中间遇到了一个关于arguments的坑,后面会讲),终于成型了。输出是按照预期的,和async源码运行的结果相同,分析也都写在注释中:

var async = {};
async.waterfall = function (tasks, cb){
    // 指向下一个将要执行的函数
    var _index = 0;

    /**
     * 调用用户指定的函数
     */
    function _run(index, args, cb){
        var task = tasks[index];
        args.push(cb);
        task.apply(null, args);
    };

    /**
     * 因为涉及到控制流的转移,从框架转移到用户,再从用户转移到框架。
     * 需要定义一个传递控制流的使者,就是这个_cb函数
     * 1.框架转移到用户:调用用户函数的同时,把_cb作为参数
     * 2.用户转移到框架:用户调用这个_cb,表明已执行完该函数,把控制交给框架。抑或结束,抑或执行下一个函数
     */
    function _cb(){
        // 如果错误了,直接回调最外层的cb
        // 如果是最后一个,也直接调用最外层的cb
        if (arguments[0] || _index === tasks.length) {
            return cb && cb.apply(null, arguments);
        }

        /**
         * 取出回调参数,作为下一个函数的输入
         * 因为回调的第一个参数是错误码,所以要去掉第一个
         */
        // var rest = arguments.slice(1); //arguments并没有slice方法,因此这样会报错
        var rest = [].slice.call(arguments, 1);

        _run(_index++, rest, _cb);
    };

    // 如果用户没有指定要串行执行的函数,则直接调用回调
    if (tasks.length === 0) return cb && cb();
    _run(_index++, [], _cb);
};

踩的这个坑是关于arguments的(在ES6语法中其实不推荐使用arguments的方式,因为语法已经支持了rest param)。我一直以为一个函数的arguments属性是一个Array,因为经常可以看到通过arguments[0]的方式去获取参数,也从来没有质疑过。先来看看下面这一个例子:

function a (){
    console.log(typeof arguments);
    console.log(arguments);
    console.log(arguments[0]);
    console.log(arguments["0"]);
    console.log(arguments.length);
    console.log([].slice.call(arguments, 1));
};

a("one", "two", "three");

/**
 * 输出(chrome):
 * object
 * ["one", "two", "three"]
 * one
 * one
 * 3
 * ["two", "three"]
 *
 * 输出(node.js)
 * object
 * { "0": "one", "1": "two", "2": "three" }
 * one
 * one
 * 3
 * [ "two", "three" ]
 */

可以看出,arguments对象并不是一个array对象。在chrome中虽然看上去打印出来的是Array,但它是可以展开的,里面还有好多参数。而且下标取值的时候不光可以用数字,也可以用字符串来取值。这也是为什么我写的代码注释中arguments.slice(1);的方式会执行错误(slice是Array才有的方法)。但是[].slice.call(arguments, 1);却能执行,说明arguments还是有一点slice的特性的,有点不太懂。感觉它同时继承了dict和array两种对象的部分特性。

原来的轮子

贴上原来的代码实现:

async.waterfall = function (tasks, callback) {
    // 这种方式也是很聪明的一种方式,可以代替 callback && callback()的方式
    // noop 是一个空函数,什么也不执行
    callback = _once(callback || noop);

    if (!_isArray(tasks)) {
        var err = new Error("First argument to waterfall must be an array of functions");
        return callback(err);
    }
    if (!tasks.length) {
        return callback();
    }
    function wrapIterator(iterator) {
        return _restParam(function (err, args) {
            if (err) {
                callback.apply(null, [err].concat(args));
            }
            else {
                var next = iterator.next();
                if (next) {
                    args.push(wrapIterator(next));
                }
                else {
                    args.push(callback);
                }
                ensureAsync(iterator).apply(null, args);
            }
        });
    }
    wrapIterator(async.iterator(tasks))();
};

抛开一些异常处理的情况,就总体逻辑流程上还是有些区别的,下面就逐个来分析一下。

迭代器

我是自己通过_index的局部变量来记录当前执行的函数的(得益于闭包的特性,这个局部变量可以一直保留着)。源码实现了一种迭代器的方式去管理传入的函数数组,非常优雅,支持next特性,观摩一下:

async.iterator = function (tasks) {
    function makeCallback(index) {
        function fn() {
            if (tasks.length) {
                tasks[index].apply(null, arguments);
            }
            return fn.next();
        }
        fn.next = function () {
            return (index < tasks.length - 1) ? makeCallback(index + 1): null;
        };
        return fn;
    }
    return makeCallback(0);
};

通过async.iterator包装以后返回的是一个迭代器对象,他同时又是一个函数可以直接执行,包装了用户传入的tasks中的第一个函数。

调度器

有了迭代器,还需要一个调度器才能按照预期的流程串行执行需要的函数,同时处理参数传递的过程(我自己写的代码,调度的工作是由_cb一起做的)。

这个调度器实现的非常棒,由于它返回的也是一个函数,因此和迭代器是属于同一个维度的(如果是调用者和被调用者的关系则不属于同一维度,他们的调用层次关系是同一层的)。_restParam函数可以暂时不用管它,因为从它的实现中可以看到,它本身和它参数中的函数是同一个维度的,它只是负责转换了一下参数的结构。完全可以理解为wrapIterator返回的就是被_restParam包着的那个函数,_restParam只是一个参数结构的转换器,处理了参数结构不一致的问题。

function _restParam(func, startIndex) {
    startIndex = startIndex == null ? func.length - 1 : +startIndex;
    return function() {
        var length = Math.max(arguments.length - startIndex, 0);
        var rest = Array(length);
        for (var index = 0; index < length; index++) {
            rest[index] = arguments[index + startIndex];
        }
        switch (startIndex) {
            case 0: return func.call(this, rest);
            case 1: return func.call(this, arguments[0], rest);
        }
        // Currently unused but handle cases outside of the switch statement:
        // var args = Array(startIndex + 1);
        // for (index = 0; index < startIndex; index++) {
        //     args[index] = arguments[index];
        // }
        // args[startIndex] = rest;
        // return func.apply(this, args);
    };
}

回到调度器的上下文,在参数传递的过程中,args是上一个函数的返回结果组成的数组,再把下一个迭代器包装一下作为该数组的最后一个元素。这样在调用当前迭代器对应的函数的时候,用户态上下文中的callback就是下一个用户态函数对应的迭代器了。整个控制流程完全处在用户层,框架层所做的事仅仅是参数结构的转换(毕竟apply函数需要的参数结构是数组,而函数调用的时候则是展开的形式)。

奇淫技巧

在阅读代码的过程中看到了不少巧妙的用法

导出

在async源码最后有这样一段代码:

// Node.js
if (typeof module === "object" && module.exports) {
   module.exports = async;
}
// AMD / RequireJS
else if (typeof define === "function" && define.amd) {
   define([], function () {
       return async;
   });
}
// included directly via                 
阅读需要支付1元查看
<