资讯专栏INFORMATION COLUMN

JavaScript 异步队列及Co实现

LdhAndroid / 2229人阅读

摘要:在中,又由于单线程的原因,异步编程又是非常重要的。方法有很多,,,观察者,,,这些中处理异步编程的,都可以做到这种串行的需求。

引入

队列对于任何语言来说都是重要的,io 的串行,请求的并行等等。在 JavaScript 中,又由于单线程的原因,异步编程又是非常重要的。昨天由一道面试题的启发,我去实现 JS 中的异步队列的时候,借鉴了 express 中间件思想,并发散到 co 实现 与 generator,以及 asyncToGenerator。

本次用例代码都在此,可以 clone 下来试一下

异步队列

很多面试的时候会问一个问题,就是怎么让异步函数可以顺序执行。方法有很多,callback,promise,观察者,generator,async/await,这些 JS 中处理异步编程的,都可以做到这种串行的需求。但是很麻烦的是,处理起来是挺麻烦的,你要不停的手动在上一个任务调用下一个任务。比如 promise,像这样:

a.then(() => b.then(() => c.then(...)))

代码嵌套的问题,有点严重。所以要是有一个队列就好了,往队列里添加异步任务,执行的时候让队列开始 run 就好了。先制定一下 API,我们有一个 queue,队列都在内部维护,通过 queue.add 添加异步任务,queue.run 执行队列,可以先想想。

参照之前 express 中间件的实现,给异步任务 async-fun 传入一个 next 方法,只有调用 next,队列才会继续往下走。那这个 next 就至关重要了,它会控制队列往后移一位,执行下一个 async-fun。我们需要一个队列,来保存 async-fun,也需要一个游标,来控制顺序。

以下是我的简单实现:

const queue = () => {
  const list = []; // 队列
  let index = 0;  // 游标

  // next 方法
  const next = () => {
    if (index >= list.length - 1) return;    
    
    // 游标 + 1
    const cur = list[++index];
    cur(next);
  }
  
  // 添加任务
  const add = (...fn) => {
    list.push(...fn);
  }

  // 执行
  const run = (...args) => {
    const cur = list[index];
    typeof cur === "function" && cur(next);
  }

  // 返回一个对象
  return {
    add,
    run,
  }
}

// 生成异步任务
const async = (x) => {
  return (next) => {// 传入 next 函数
    setTimeout(() => {
      console.log(x);
      next();  // 异步任务完成调用
    }, 1000);
  }
}

const q = queue();
const funs = "123456".split("").map(x => async(x));
q.add(...funs);
q.run();// 1, 2, 3, 4, 5, 6 隔一秒一个。

我这里没去构造一个 class,而是通过闭包的特性去处理的。queue 方法返回一个包含 add,run 的对象,add 即为像队列中添加异步方法,run 就是开始执行。在 queue 内部,我们定义了几个变量,list 用来保存队列,index 就是游标,表示队列现在走到哪个函数了,另外,最重要的是 next 方法,它是控制游标向后移动的。

run 函数一旦执行,队列即开始 run。一开始执行队列里的第一个 async 函数,我们把 next 函数传给了它,然后由 async 函数决定什么时候执行 next,即开始执行下一个任务。我们没有并不知道异步任务什么时候才算完成,只能通过打成某种共识,来告知 queue 某个任务完成。就是传给任务的 next 函数。其实 async 返回的这个函数,有一个名字,叫 Thunk,后面我们会简单介绍。

Thunk

thunk 其实是为了解决 “传名调用” 的。就是我传给函数 A 一个表达式作参数 x + 1,但是我不确定这个 x + 1 什么时候会用到,以及会不会用到,如果在传入就执行,这个求值是没有必要的。所以就出现了一个临时函数 Thunk,来保存这个表达式,传入函数 A 中,待需要时再调用。

const thunk = () => {
  return x + 1;
};

const A = thunk => {
  return thunk() * 2;
}

嗯... 其实就是一个回调函数...

暂停

其实只要某个任务,不继续调用 next,队列就已经不会继续往下走了。比如我们 async 任务里加一个判断(通常是异步 io,请求的容错处理):

// queue 函数不变,
// async 加限制条件
const async = (x) => {
  return (next) => {
    setTimeout(() => {
      if(x > 3) {
        console.log(x);
        q.run();  //重试
        return;
      }
      console.log(x);
      next();
    }, 1000);
  }
}

const q = queue();
const funs = "123456".split("").map(x => async(x));
q.add(...funs);
q.run();
//打印结果: 1, 2, 3, 4, 4,4, 4,4 一直是 4

当执行到第四个任务的时候,x 是 4 的时候,不再继续,就可以直接 return,不再调用 next。也有可能是出现错误,我们需要再重试,那就再调用 q.run 就可以了,因为游标保存的就是当前的 async 任务的索引。

另外,还有一种方式,就是添加 stop 方法。虽然感觉上面的方法就 OK 了,但是 stop 的好处在于,你可以主动的停止队列,而不是在 async 任务里加限制条件。当然,有暂停就有继续了,两种方式,一个是 retry,就是重新执行上一次暂停的那个;另一个就是 goOn,不管上次最后一个如何,继续下一个。上代码:

const queue = () => {
  const list = [];
  let index = 0;
  let isStop = false;

  const next = () => {
    // 加限制
    if (index >= list.length - 1 || isStop) return;    
    const cur = list[++index];
    cur(next);
  }

  const add = (...fn) => {
    list.push(...fn);
  }

  const run = (...args) => {
    const cur = list[index];
    typeof cur === "function" && cur(next);
  }

  const stop = () => {
    isStop = true;
  }

  const retry = () => {
    isStop = false;
    run();
  }

  const jump = () => {
    isStop = false;
    next();
  }

  return {
    add,
    run,
    stop,
    retry,
    goOn,
  }
}

const async = (x) => {
  return (next) => {
    setTimeout(() => {
      console.log(x);
      next();
    }, 1000);
  }
}

const q = queue();
const funs = "123456".split("").map(x => async(x));
q.add(...funs);
q.run();

setTimeout(() => {
  q.stop();
}, 3000)


setTimeout(() => {
  q.goOn();
}, 5000)

其实还是加拦截... 只不过从 async 函数中,换到了 next 函数里面,利用 isStop 这个变量切换 true/false,开关暂停。我加了两个定时器,一个是 3 秒后暂停,一个是 5 秒后继续,(请忽略定时器的误差),按道理应该是队列到三秒的时候,也就是第三个任务执行完暂停,然后再隔 2 秒,继续。结果打印到 3 的时候,停住,两秒之后继续 4,5,6.

两种思路,请结合场景思考问题。

并发

上面的都是在做串行,假如 run 的时候我要并行呢... 也很简单,把队列一次性跑完就可以了。

// 为了代码短一些,把 retry,goOn 先去掉了。

const queue = () => {
  const list = [];
  let index = 0;
  let isStop = false;
  let isParallel = false;

  const next = () => {
    if (index >= list.length - 1 || isStop || isParallel) return;    
    const cur = list[++index];
    cur(next);
  }

  const add = (...fn) => {
    list.push(...fn);
  }

  const run = (...args) => {
    const cur = list[index];
    typeof cur === "function" && cur(next);
  }

  const parallelRun = () => {
    isParallel = true;
    for(const fn of list) {
      fn(next);
    }
  }

  const stop = () => {
    isStop = true;
  }
  
  return {
    add,
    run,
    stop,
    parallelRun,
  }
}

const async = (x) => {
  return (next) => {
    setTimeout(() => {
      console.log(x);
      next();
    }, 1000);
  }
}

const q = queue();
const funs = "123456".split("").map(x => async(x));
q.add(...funs);
q.parallelRun();
// 一秒后全部输出 1, 2, 3, 4, 5, 6

我添加了一个 parallelRun 方法,用于并行,我觉得还是不要放到 run 函数里面了,抽象单元尽量细化还是。然后还加了一个 isParallel 的变量,默认是 false,考虑到 next 函数有可能会被调用,所以需要加一个拦截,保证不会处乱。

以上就是利用仅用 thunk 函数,结合 next 实现的异步队列控制器,queue,跟你可以把 es6 代码都改成 es5,保证兼容,当然是足够简单的,不适用于负责的场景 ?,仅提供思路。

generator 与 co

为什么要介绍 generator,首先它也是用来解决异步回调的,另外它的使用方式也是调用 next 函数,generator 才会往下执行,默认是暂停状态。yield 就相当于上面的 q.add,往队列中添加任务。所以我也打算一起介绍,来更好的拓宽思路。发散思维,相似的知识点做好归纳,然后某一天你就会突然有一种:原来是这么回事,原来 xxx 是借鉴子 yyy,然后你又去研究 yyy - -。

简介 generator

简单介绍回顾一下,因为有同学不经常用,肯定会有遗忘。

// 一个简单的栗子,介绍它的用法

function* gen(x) {
  const y = yield x + 1;
  console.log(y, "here"); // 12
  return y;
}

const g = gen(1);
const value = g.next().value; // {value: 2, done: false}

console.log(value); // 2
console.log(g.next(value + 10)); // {value: 12, done: true}

首先生成器其实就是一个通过函数体内部定义迭代算法,然后返回一个 iterator 对象。关于iterator,可以看我另一篇文章。
gen 执行返回一个对象 g,而不是返回结果。g 跟其他 iterator 一样,通过调用 next 方法,保证游标 + 1,并且返回一个对象,包含了 value(yield 语句的结果),和 done(迭代器是否完成)。另外,yield 语句的值,比如上面代码中的 y,是下一次调用 next 传入的参数,也就是 value + 10,所以是 12.这样设计是有好处的,因为这样你就可以在 generator 内部,定义迭代算法的时候,拿到上次的结果(或者是处理后的结果)了。

但是 generator 有一个弊端就是不会自动执行,TJ 大神写了一个 co,来自动执行 generator,也就是自动调用 next。它要求 yield 后面的函数/语句,必须是 thunk 函数或者是 promise 对象,因为只有这样才会串联执行完,这跟我们最开始实现 queue 的思路是一样的。co 的实现有两种思想,一个是 thunk,一个是 promise,我们都来试一下。

Thunk 实现

还记得最开始的 queue 怎么实现的吗,内部定义 next 函数,来保证游标的前进,async 函数会接收 next,去执行 next。到这里是一样的,我们只要在 co 函数内部定义一个同样的 next 函数,来保证继续执行,那么 generator 是没有提供索引的,不过它提供了 g.next 函数啊,所以我们只需要给 async 函数传 g.next 不就好了,async 就是 yield 后面的语句啊,也就是 g.value。但是并不能直接传 g.next,为什么?因为下一次的 thunk 函数,要通过 g.next 的返回值 value 取到啊,木有 value,下一个 thunk 函数不就没了... 所以我们还是需要定义一个 next 函数去包装一下的。

上代码:

const coThunk = function(gen, ...params) {

  const g = gen(...params);
  
  const next = (...args) => { // args 用于接收参数
    const ret = g.next(...args);   // args 传给 g.next,即赋值给上一个 yield 的值。
    if(!ret.done) { // 去判断是否完成
      ret.value(next);  // ret.value 就是下一个 thunk 函数
    }
  }

  next(); // 先调用一波
}

// 返回 thunk 函数的 asyncFn
const asyncFn = (x) => {
  return (next) => { // 接收 next
    const data = x + 1;
    setTimeout(() => {
      next && next(data);
    }, 1000)
  }
}

const gen = function* (x) {
  const a = yield asyncFn(x);
  console.log(a);

  const b = yield asyncFn(a);
  console.log(b);

  const c = yield asyncFn(b);
  console.log(c);

  const d = yield asyncFn(c);
  console.log(d);

  console.log("done");
}

coThunk(gen, 1);
// 2, 3, 4, 5, done

这里定义的 gen,功能很简单,就是传入参数 1,然后每个 asyncFn 异步累加,即多个异步操作串行,并且下一个依赖上一个的返回值。

promise 实现

其实思路都是一样的,只不过调用 next,换到了 co 内部。因为 yield 后面的语句是 promise 对象的话,我们可以在 co 内部拿到了,然后在 g.next().value 的 then 语句执行 next 就好了。

// 定义 co
const coPromise = function(gen) {
// 为了执行后的结果可以继续 then
  return new Promise((resolve, reject) => {
    const g = gen();
    
    const next = (data) => { // 用于传递,只是换个名字
      const ret = g.next(data);
      if(ret.done) { // done 后去执行 resolve,即co().then(resolve)
        resolve(data); // 最好把最后一次的结果给它
        return;
      }
      ret.value.then((data) => { // then 中的第一个参数就是 promise 对象中的 resolve,data 用于接受并传递。
        next(data);  //调用下一次 next
      })
    }

    next();
  })
}

const asyncPromise = (x) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(x + 1);
    }, 1000)
  })
}

const genP = function* () {
  const data1 = yield asyncPromise(1);
  console.log(data1);

  const data2 = yield asyncPromise(data1);
  console.log(data2);

  const data3 = yield asyncPromise(data2);
  console.log(data3);
}

coPromise(genP).then((data) => {
  setTimeout(() => {
    console.log(data + 1); // 5
  }, 1000)
});
// 一样的 2, 3, 4, 5

其实 co 的源码就是通过这两种思路实现的,只不过它做了更多的 catch 错误的处理,而且支持你 yield 一个数组,对象,通过 promise.all 去实现。另外 yield thunk 函数的时候,它统一转成 promise 去处理了。感兴趣的可以去看一下 co,相信现在一定很明朗了。

async/await

现在 JS 中用的最常用的异步解决方案了,不过 async 也是基于 generator 的实现,只不过是做了封装。如果把 async/await 转化成 generate/yield,只需要把 await 语法换成 yield,再扔到一个 generate 函数中,async 的执行换成 coPromise(gennerate) 就好了。

const asyncPromise = (x) => {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(x + 1);
    }, 1000)
  })
}

async function fn () {
  const data = await asyncPromise(1);
  console.log(data);
}
fn();

// 那转化成 generator 可能就是这样了。 coPromise 就是上面的实现
function* gen() {
  const data = yield asyncPromise(1);
  console.log(data);
}

coPromise(gen);

asyncToGenerator 就是这样的原理,事实上 babel 也是这样转化的。

最后

我首先是通过 express 的中间件思想,实现了一个 JS 中需求常见的 queue (异步队列解决方案),然后再接着去实现一个简单的 coThunk,最后把 thunk 换成 promise。因为异步解决方案在 JS 中是很重要的,去使用现成的解决方案的时候,如果能去深入思考一下实现的原理,我相信是有助于我们学习进步的。

欢迎 star 个人 blog:https://github.com/sunyongjia... ?

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/88768.html

相关文章

  • Node.js 异步异闻录

    摘要:的异步完成整个异步环节的有事件循环观察者请求对象以及线程池。执行回调组装好请求对象送入线程池等待执行,实际上是完成了异步的第一部分,回调通知是第二部分。异步编程是首个将异步大规模带到应用层面的平台。 showImg(https://segmentfault.com/img/remote/1460000011303472); 本文首发在个人博客:http://muyunyun.cn/po...

    zzbo 评论0 收藏0
  • 总结javascript基础概念(二):事件队列循环

    摘要:而事件循环是主线程中执行栈里的代码执行完毕之后,才开始执行的。由此产生的异步事件执行会作为任务队列挂在当前循环的末尾执行。在下,观察者基于监听事件的完成情况在下基于多线程创建。 主要问题: 1、JS引擎是单线程,如何完成事件循环的? 2、定时器函数为什么计时不准确? 3、回调与异步,有什么联系和不同? 4、ES6的事件循环有什么变化?Node中呢? 5、异步控制有什么难点?有什么解决方...

    zhkai 评论0 收藏0
  • javascript 异步编程

    摘要:执行栈清空后,检查微任务队列,将可执行的微任务全部执行。对象的错误具有冒泡性质,会一直向后传递,直到被捕获为止。返回的遍历器对象,可以依次遍历函数内部的每一个状态。表示函数里有异步操作,表示紧跟在后面的表达式需要等待结果。 javascript 是单线程执行的,由js文件自上而下依次执行。即为同步执行,若是有网络请求或者定时器等业务时,不能让浏览器傻傻等待到结束后再继续执行后面的js吧...

    Nino 评论0 收藏0
  • 《Node.js设计模式》基于ES2015+的回调控制流

    摘要:以下展示它是如何工作的函数使用构造函数创建一个新的对象,并立即将其返回给调用者。在传递给构造函数的函数中,我们确保传递给,这是一个特殊的回调函数。 本系列文章为《Node.js Design Patterns Second Edition》的原文翻译和读书笔记,在GitHub连载更新,同步翻译版链接。 欢迎关注我的专栏,之后的博文将在专栏同步: Encounter的掘金专栏 知乎专栏...

    LiuRhoRamen 评论0 收藏0
  • 夯实基础-JavaScript异步编程

    摘要:调用栈被清空,消息队列中并无任务,线程停止,事件循环结束。不确定的时间点请求返回,将设定好的回调函数放入消息队列。调用栈执行完毕执行消息队列任务。请求并发回调函数执行顺序无法确定。 异步编程 JavaScript中异步编程问题可以说是基础中的重点,也是比较难理解的地方。首先要弄懂的是什么叫异步? 我们的代码在执行的时候是从上到下按顺序执行,一段代码执行了之后才会执行下一段代码,这种方式...

    shadowbook 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<