摘要:调度器有了迭代器,还需要一个调度器才能按照预期的流程串行执行需要的函数,同时处理参数传递的过程我自己写的代码,调度的工作是由一起做的。
本文摘自我的博客,欢迎大家去逛逛。
又是两周没写博客了,圣诞夜来水一发~
今天稍微看了下async的源码,感觉很简短精炼,总共也才1000多行代码,好多值得学习的地方。主要看的是waterfall模块,由于源码中有好多不同接口公用的部分,因此看完waterfall这个接口的整个流程,差不多就cover了一半的async源码了。
在没有太多使用经验的情况下,直接看源码,可能会遇到一些不明所以的细节,看了可能也只能吸收很少的一部分。最好的方式我觉得莫过于自己先造一遍轮子,再看源码了。
接口需求waterfall这个接口的命名还是很形象的
我要定义一个waterfall函数,满足以下需求:
可以按照Array给定的顺序逐个执行
所有函数执行完毕后,调用指定的回调函数
前一个函数的输出作为后一个函数的输入
中途某一个函数执行失败,直接调用回调函数结束
需求的代码描述如下:
async.waterfall([ function(callback) { callback(null, "one", "two"); }, function(arg1, arg2, callback) { console.log(arg1); console.log(arg2); // arg1 now equals "one" and arg2 now equals "two" callback(null, "three"); }, function(arg1, callback) { console.log(arg1); // arg1 now equals "three" callback(null, "done"); } ], function (err, result) { // result now equals "done" console.log(result); }); // 期望输出: // one // two // three // done编码
代码组织了好一会儿,又调试了好一会后(中间遇到了一个关于arguments的坑,后面会讲),终于成型了。输出是按照预期的,和async源码运行的结果相同,分析也都写在注释中:
var async = {}; async.waterfall = function (tasks, cb){ // 指向下一个将要执行的函数 var _index = 0; /** * 调用用户指定的函数 */ function _run(index, args, cb){ var task = tasks[index]; args.push(cb); task.apply(null, args); }; /** * 因为涉及到控制流的转移,从框架转移到用户,再从用户转移到框架。 * 需要定义一个传递控制流的使者,就是这个_cb函数 * 1.框架转移到用户:调用用户函数的同时,把_cb作为参数 * 2.用户转移到框架:用户调用这个_cb,表明已执行完该函数,把控制交给框架。抑或结束,抑或执行下一个函数 */ function _cb(){ // 如果错误了,直接回调最外层的cb // 如果是最后一个,也直接调用最外层的cb if (arguments[0] || _index === tasks.length) { return cb && cb.apply(null, arguments); } /** * 取出回调参数,作为下一个函数的输入 * 因为回调的第一个参数是错误码,所以要去掉第一个 */ // var rest = arguments.slice(1); //arguments并没有slice方法,因此这样会报错 var rest = [].slice.call(arguments, 1); _run(_index++, rest, _cb); }; // 如果用户没有指定要串行执行的函数,则直接调用回调 if (tasks.length === 0) return cb && cb(); _run(_index++, [], _cb); };坑
踩的这个坑是关于arguments的(在ES6语法中其实不推荐使用arguments的方式,因为语法已经支持了rest param)。我一直以为一个函数的arguments属性是一个Array,因为经常可以看到通过arguments[0]的方式去获取参数,也从来没有质疑过。先来看看下面这一个例子:
function a (){ console.log(typeof arguments); console.log(arguments); console.log(arguments[0]); console.log(arguments["0"]); console.log(arguments.length); console.log([].slice.call(arguments, 1)); }; a("one", "two", "three"); /** * 输出(chrome): * object * ["one", "two", "three"] * one * one * 3 * ["two", "three"] * * 输出(node.js) * object * { "0": "one", "1": "two", "2": "three" } * one * one * 3 * [ "two", "three" ] */
可以看出,arguments对象并不是一个array对象。在chrome中虽然看上去打印出来的是Array,但它是可以展开的,里面还有好多参数。而且下标取值的时候不光可以用数字,也可以用字符串来取值。这也是为什么我写的代码注释中arguments.slice(1);的方式会执行错误(slice是Array才有的方法)。但是[].slice.call(arguments, 1);却能执行,说明arguments还是有一点slice的特性的,有点不太懂。感觉它同时继承了dict和array两种对象的部分特性。
原来的轮子贴上原来的代码实现:
async.waterfall = function (tasks, callback) { // 这种方式也是很聪明的一种方式,可以代替 callback && callback()的方式 // noop 是一个空函数,什么也不执行 callback = _once(callback || noop); if (!_isArray(tasks)) { var err = new Error("First argument to waterfall must be an array of functions"); return callback(err); } if (!tasks.length) { return callback(); } function wrapIterator(iterator) { return _restParam(function (err, args) { if (err) { callback.apply(null, [err].concat(args)); } else { var next = iterator.next(); if (next) { args.push(wrapIterator(next)); } else { args.push(callback); } ensureAsync(iterator).apply(null, args); } }); } wrapIterator(async.iterator(tasks))(); };
抛开一些异常处理的情况,就总体逻辑流程上还是有些区别的,下面就逐个来分析一下。
迭代器我是自己通过_index的局部变量来记录当前执行的函数的(得益于闭包的特性,这个局部变量可以一直保留着)。源码实现了一种迭代器的方式去管理传入的函数数组,非常优雅,支持next特性,观摩一下:
async.iterator = function (tasks) { function makeCallback(index) { function fn() { if (tasks.length) { tasks[index].apply(null, arguments); } return fn.next(); } fn.next = function () { return (index < tasks.length - 1) ? makeCallback(index + 1): null; }; return fn; } return makeCallback(0); };
通过async.iterator包装以后返回的是一个迭代器对象,他同时又是一个函数可以直接执行,包装了用户传入的tasks中的第一个函数。
调度器有了迭代器,还需要一个调度器才能按照预期的流程串行执行需要的函数,同时处理参数传递的过程(我自己写的代码,调度的工作是由_cb一起做的)。
这个调度器实现的非常棒,由于它返回的也是一个函数,因此和迭代器是属于同一个维度的(如果是调用者和被调用者的关系则不属于同一维度,他们的调用层次关系是同一层的)。_restParam函数可以暂时不用管它,因为从它的实现中可以看到,它本身和它参数中的函数是同一个维度的,它只是负责转换了一下参数的结构。完全可以理解为wrapIterator返回的就是被_restParam包着的那个函数,_restParam只是一个参数结构的转换器,处理了参数结构不一致的问题。
function _restParam(func, startIndex) { startIndex = startIndex == null ? func.length - 1 : +startIndex; return function() { var length = Math.max(arguments.length - startIndex, 0); var rest = Array(length); for (var index = 0; index < length; index++) { rest[index] = arguments[index + startIndex]; } switch (startIndex) { case 0: return func.call(this, rest); case 1: return func.call(this, arguments[0], rest); } // Currently unused but handle cases outside of the switch statement: // var args = Array(startIndex + 1); // for (index = 0; index < startIndex; index++) { // args[index] = arguments[index]; // } // args[startIndex] = rest; // return func.apply(this, args); }; }
回到调度器的上下文,在参数传递的过程中,args是上一个函数的返回结果组成的数组,再把下一个迭代器包装一下作为该数组的最后一个元素。这样在调用当前迭代器对应的函数的时候,用户态上下文中的callback就是下一个用户态函数对应的迭代器了。整个控制流程完全处在用户层,框架层所做的事仅仅是参数结构的转换(毕竟apply函数需要的参数结构是数组,而函数调用的时候则是展开的形式)。
奇淫技巧在阅读代码的过程中看到了不少巧妙的用法
在async源码最后有这样一段代码:
// Node.js if (typeof module === "object" && module.exports) { module.exports = async; } // AMD / RequireJS else if (typeof define === "function" && define.amd) { define([], function () { return async; }); } // included directly via