资讯专栏INFORMATION COLUMN

白洁血战Node并发编程 - 预览

Lemon_95 / 914人阅读

摘要:在这些动作结束后,所有的队列变化,就是整个组合任务状态机的下一个状态。如果组合状态机停止了,向其中的任何一个对象执行或者操作都可以让整个状态机继续动起来。

预览。

先给出一个基础类代码。

const EventEmitter = require("events")
const debug = require("debug")("transform")

class Transform extends EventEmitter {

  constructor (options) {
    super()
    this.concurrency = 1

    Object.assign(this, options)

    this.pending = []
    this.working = []
    this.finished = []
    this.failed = []

    this.ins = []
    this.outs = []
  }

  push (x) {
    this.pending.push(x)
    this.schedule()
  }

  pull () {
    let xs = this.finished
    this.finished = []
    this.schedule()
    return xs
  }

  isBlocked () {
    return !!this.failed.length ||              // blocked by failed
      !!this.finished.length ||                 // blocked by output buffer (lazy)
      this.outs.some(t => t.isBlocked())        // blocked by outputs transform
  }

  isStopped () {
    return !this.working.length && this.outs.every(t => t.isStopped())
  }

  root () {
    return this.ins.length === 0 ? this : this.ins[0].root()
  }

  pipe (next) {
    this.outs.push(next)
    next.ins.push(this)
    return next
  }

  print () {
    debug(this.name,
      this.pending.map(x => x.name),
      this.working.map(x => x.name),
      this.finished.map(x => x.name),
      this.failed.map(x => x.name),
      this.isStopped())
    this.outs.forEach(t => t.print())
  }

  schedule () {
    // stop working if blocked
    if (this.isBlocked()) return

    this.pending = this.ins.reduce((acc, t) => [...acc, ...t.pull()], this.pending)

    while (this.working.length < this.concurrency && this.pending.length) {
      let x = this.pending.shift()
      this.working.push(x)
      this.transform(x, (err, y) => {
        this.working.splice(this.working.indexOf(x), 1)
        if (err) {
          x.error = err
          this.failed.push(x)
        } else {
          if (this.outs.length) {
            this.outs.forEach(t => t.push(y))
          } else {
            if (this.root().listenerCount("data")) {
              this.root().emit("data", y)
            } else {
              this.finished.push(y)
            }
          }
        }

        this.schedule()
        this.root().emit("step", this.name, x.name)
      })
    }
  }

}

module.exports = Transform

这段代码目前还是雏形。

Transform类的设计类似node里的stream.Transform,但是它的设计目的不是buffering或流性能,而是作为并发编程的基础模块。

如果你熟悉流式编程,Transform的设计就很容易理解;在内部,Transform维护四个队列:

pending是input buffer

working是当前正在执行的任务

finished是output buffer,它的目的不是为了buffer输出,而是在没有其他输出办法的时候作一下buffer。

failed是失败的任务

Transform可以组合成DAG(Directed Acyclic Graph)使用,insouts用来存储前置和后置Transform的引用,pipe方法负责设置这种双向链接;最常见的情况是双向链表,即insouts都只有一个对象。但把他们设计成数组就可以允许fan-in, fan-out的结构。

pushpull是write和read的等价物。

schedule是核心函数,它的任务是填充working队列。在构造函数的参数里应该提供一个名字为transform的异步函数,schedule使用这个函数运行任务,在运行结束后,根据结果把任务推到failed队列、推到下一个Transformer、用root节点的emit输出、或者推到自己的finished队列里。

Transform设计的核心思想,就是把并发任务的状态,不使用对象属性来编码,只使用队列位置来编码;任何一个子任务,在任何时刻,仅存在于一个Transform对象的某个队列中。换句话说,它等于把并发任务用资源来建模。如果你熟悉restful api对过程或状态的建模方式就很容易理解这一点。

Transform中,任何transform异步函数的返回,都是一个stepstep是用Transform实现并发组合的最重要概念;

每一次transform函数返回,都会发生改变自己的队列或向后续的Transform对象push任务的动作,这个push动作会触发后续Transformschedule方法;step结束时自己的schedule方法也会被调用,它会重新填充任务。在这些动作结束后,所有Transform的队列变化,就是整个组合任务状态机的下一个状态。

这个状态是显式的,可以打印出来看,对debug非常有帮助;虽然异步i/o会让这种状态具有不确定性,但至少这里坚持了组合状态机模型在处理并发问题时的同步原则,每个step结束时整体做一次状态迁移,这个状态迁移可以良好定义和观察,这是Event模型下并发编程和Thread模型的重要区别。后者遇到并发逻辑引起的微妙错误时,很难捕捉现场分析,因为每一个Thread是黑盒。

transform返回开始到emit(step)之间的一连串连锁动作都是中间过程,最终实现一次完整的状态迁移,这个过程必须是同步的。不应在这里出现异步、setImmediate或者process.nextTick等调用,这会带来额外的不确定因素和极难发现和修复的bug。

在前面很长一段时间的并发编程实践中,我指出过Promise的race/settle和错误处理逻辑在一些场景下的困难。Promise的过程逻辑不完备。我也花了很多力气试图在Process代数层面上把error, success, finish, race, settle, abort, pause, resume, 和他们的组合逻辑定义出来,但最终发现这很困难,因为实际编程中各种处理情况太多了。

所以在Transform的设计中,这些逻辑全部被抛弃了,因为事实上它们都不是真正的基础并发逻辑

Transform试图实现组合的基础并发逻辑只有一个:stoppedstopped的定义非常简单:在一次step结束时,所有的Transformworking队列为空,就是(整体的)stopped。这里要再次强调前述的step结束时同步方法的必要性,如果你在schedule里使用了异步方法调用,那么这个stopped的判断就可能是错的,因为schedule可能会在event loop里放置了一个马上就会产生新的working任务的动作,而isStopped()的判断就错了。

stopped时,整体组合状态可能是success, error, paused, 等等,都不难判断,但目前代码尚未稳定,我不打算加入语法糖。

在blocking i/o和同步的编程模式下,因果链和代码书写形式是一致的,但是在异步编程下,因果是异步和并发的,你只能去改变因,然后去观察果,这是很多程序员不适应异步编程的根本原因,因为它要改变思维的习惯。

使用Transform来处理并发编程,仍然是在试图重建这个因果链,即使他们是并发的,但是我们要有一个办法把他们串起来;

前面说到的isStopped()是观察到的果,能够影响它的因,是isBlocked()函数,这个函数在schedule中被调用,如果估值为true,就会阻止schedule继续向working队列调度任务。

这里写的isBlocked()的代码实现只是一个例子;可以阻止schedule的原因可能有很多,比如出现错误,或者输出buffer满了,这些可以由实现者自己去定义。他们是policy,isBlocked()本身是mechanism。这个策略的粒度是每个Transform对象都可以有自己的策略。比如一个删除临时文件的操作,结果是无关痛痒的,那么它不该因为error就block。

isBlocked()逻辑可以象示例代码里那样向下chain起来,即只要有后续任务block了,前置任务就该停下来;这在绝大多数情况下都是合理的逻辑。因为虽然我们写的是流式处理办法,但是我们不是在处理octet-stream,追求性能的buffering和flow control都没什么意义,如果前面任务在copy文件后面的任务要移动到目标文件夹,如果目标文件夹出了问题前面快速移动了大量文件最终也无法成功。

如果组合状态机停止了,向其中的任何一个Transform对象执行push或者pull操作都可以让整个状态机继续动起来。从root节点push是常见情况,从leaf节点pull也是,向中间节点push也是可能的;

资源建模的一个好处是你可以把状态呈现给用户,如果一个复制文件的任务因为文件名冲突而fail,你还可以让用户选择处理策略,例如覆盖或者重命名,在用户选择了操作之后,代码会从某个Transform对象的failed队列中取走一个对象,修改策略参数后重新push进去,那么这个状态机可以继续执行下去;这种可处理的错误不该成为block整个状态机工作(复制其他文件和文件夹)的原因,除非他们积累到可观的数量,在Transform模式下这些都非常容易实现,开发者可以很简单的编写isBlocked()的策略;

和node的stream一样,Transform是lazy的,纯粹的push machine可能会在中间节点buffer大量的任务,这对把任务作为流处理来说是不合适的;同时,Lazy对于停下来的组合状态机能继续run起来很重要,pull方法就是这个设计目的,它的schedule逻辑和push一样,只是方向相反;如果设置了Leaf节点会因为输出缓冲而block,它就可以block整个状态机(或者其中的一部分),这在某些情况下也是有用的功能,如果整个状态机的输出因为某种原因暂时无法被立刻消费掉。

abort逻辑没有在代码中实现,但它很容易,可以遍历所有的Transform,如果working队列中的对象有abort方法,就调用它;这不是个立即的中止,该对象仍然要通过callback返回才能stop。如果要全局的block,可以把所有的Leaf Node都pipe到一个sink节点去,把这个sink节点强制设置成isBlocked,可以block全部。pauseresume也是非常类似的逻辑。

当然你可能会遇到类似finally的逻辑是必须去执行的,即使在发生错误的时候,它意味着这个Transform要向前传递isBlocked信息,但是它的Schedule方法不必停止工作。它可以一直运行到把所有队列任务都处理完为止。

重载schedule方法也是可能的;例如你的任务之间有前后依赖的逻辑,你就可以重载schedule方法实现自己的调度方式。另外这里的schedule代码只基于transform函数,很显然如果transform本身是一个Transform对象它也应该工作,实现组合过程,包括Sequencer,Parallel等等,这些都是需要实现的。

总而言之,isBlockedschedule是分开的逻辑,它们有各自不同的设计目的和使命,你可以重载它们获得自己想要的结果。所以写在这里的代码,重要的不是他们的实现,而是其机制设计和界面设计,以及接口承诺;所有逻辑都是足够原子化的,每个函数只做一件事,isBlocked是因,可以根据需要选择策略,isStopped是果,通过step观察和实现后续逻辑。应该避免通过向基类添加新方法来扩展能力,因为Transform使用队列和任务描述状态,这个描述是完备的,机制也是完善的。

就像我在另一篇介绍JavaScript语言的文章里写的一样,如果针对问题的模型具有完备性,即使抽象,也可以通过组合基本操作和概念获得更多的特性,而不是在模型上增加概念,除非你认为模型不够完备。

软件工程中不是什么地方都要上状态机(automaton)这么严格的模型工具,项目软件里写到bug数量足够低就可以了,但是如果你要写系统软件或者对正确性有苛刻要求的东西,如果你没有用状态机建模,那么实际上你没有完备设计。

当然有了完备设计也不意味着软件没bug了,但一个好的设计可以让你对问题的理解、遇到问题时找到原因,有极大的帮助。

在复杂系统中,上述的同步方法状态机组合,和Hierarchical的状态机组合,是我们目前已知的两种具有完备性的模型方法。但是两者不同。虽然Transform的组合看起来是一个Hierarchy,但是它就像你在纸上画一棵树,它仍然是二维的,每个step的整体状态联动的迁移只是在populate一次状态迁移的范围,并不是几何级数的增加状态组合;所以我们仍然可以构筑一个线性的因果链,每个step因果因果这样的继续下去,和没有并发的状态机是一样。

本质上这是数学归纳法:如果我们能证明如果n正确,那么n+1是正确的,这就可以证明chain下去的状态组合即使是无穷也是正确的。

第二段代码是使用的一个示例,这个class没有必要,是为了保证和老代码接口兼容,因为有一些项目内其他代码的依赖性就不解释了,很容易看明白大概逻辑;列在这里只是展示一下Transform使用时pipe过程的代码样子。

const Promise = require("bluebird")
const path = require("path")
const fs = Promise.promisifyAll(require("fs"))
const EventEmitter = require("events")
const debug = require("debug")("dircopy")
const rimraf = require("rimraf")

const Transform = require("../lib/transform")
const { forceXstat } = require("../lib/xstat")
const fileCopy = require("./filecopy")

class DirCopy extends EventEmitter {

  constructor (src, tmp, files, getDirPath) {
    super()

    let dst = getDirPath()
    let pipe = new Transform({
      name: "copy",
      concurrency: 4,
      transform: (x, callback) =>
        (x.abort = fileCopy(path.join(src, x.name), path.join(tmp, x.name),
          (err, fingerprint) => {
            delete x.abort
            if (err) {
              callback(err)
            } else {
              callback(null, (x.fingerprint = fingerprint, x))
            }
          }))
    }).pipe(new Transform({
      name: "stamp",
      transform: (x, callback) =>
        forceXstat(path.join(tmp, x.name), { hash: x.fingerprint },
          (err, xstat) => err
            ? callback(err)
            : callback(null, (x.uuid = xstat.uuid, x)))
    })).pipe(new Transform({
      name: "move",
      transform: (x, callback) =>
        fs.link(path.join(tmp, x.name), path.join(dst, x.name), err => err
          ? callback(err)
          : callback(null, x))
    })).pipe(new Transform({
      name: "remove",
      transform: (x, callback) => rimraf(path.join(tmp, x.name), () => callback(null))
    })).root()

    let count = 0

    // drain data
    pipe.on("data", data => this.emit("data", data))
    pipe.on("step", (tname, xname) => {
      debug("------------------------------------------")
      debug(`step ${count++}`, tname, xname)
      pipe.print()
      if (pipe.isStopped()) this.emit("stopped")
    })

    files.forEach(name => pipe.push({ name }))
    pipe.print()
    this.pipe = pipe
  }

}

module.exports = DirCopy

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/87172.html

相关文章

  • 白洁血战Node.js并发编程 01 状态机

    摘要:状态机状态机是模型层面的概念,与编程语言无关。状态机具有良好的可实现性和可测试性。在代码里,这是一个,但是我们在状态机模型中要把他理解为事件。 这一篇是这个系列的开篇,没有任何高级内容,就讲讲状态机。 状态机 状态机是模型层面的概念,与编程语言无关。它的目的是为对象行为建模,属于设计范畴。它的基础概念是状态(state)和事件(event)。 对象的内部结构描述为一组状态S1, S2,...

    fjcgreat 评论0 收藏0
  • 2017-08-10 前端日报

    摘要:前端日报精选译用搭建探索生命周期中的匿名递归浏览器端机器智能框架深入理解笔记和属性中文上海线下活动前端工程化架构实践沪江技术沙龙掘金周二放送追加视频知乎专栏第期聊一聊前端自动化测试上双关语来自前端的小段子,你看得懂吗众成翻 2017-08-10 前端日报 精选 [译] 用 Node.js 搭建 API Gateway探索 Service Worker 「生命周期」JavaScript ...

    wupengyu 评论0 收藏0
  • 2017-10-05 前端日报

    摘要:前端日报精选浏览器渲染过程与性能优化新版采用新引擎,速度是旧版的倍,名字和也变了中文与的使用个人文章在对比中理解掘金白洁血战并发编程异步英文 2017-10-05 前端日报 精选 浏览器渲染过程与性能优化Firefox 新版采用新引擎,速度是旧版的 2 倍,名字和 Logo 也变了8 Key React Component DecisionsThe Intl.PluralRules A...

    tracy 评论0 收藏0
  • 和少妇白洁一起学JavaScript之Async/Await

    摘要:匿名函数是我们喜欢的一个重要原因,也是,它们分别消除了很多代码细节上需要命名变量名或函数名的需要。这个匿名函数内,有更多的操作,根据的结果针对目录和文件做了不同处理,而且有递归。 能和微博上的 @响马 (fibjs作者)掰扯这个问题是我的荣幸。 事情缘起于知乎上的一个热贴,诸神都发表了意见: https://www.zhihu.com/questio... 这一篇不是要说明白什么是as...

    Bryan 评论0 收藏0
  • 和少妇白洁一起学JavaScript之Async/Await II

    摘要:的科学定义是或者,它的标志性原语是。能解决一类对语言的实现来说特别无力的状态机模型流程即状态。容易实现是需要和的一个重要原因。 前面写了一篇,写的很粗,这篇讲讲一些细节。实际上Fiber/Coroutine vs Async/Await之争不是一个简单的continuation如何实现的问题,而是两个完全不同的problem和solution domain。 Event Model 我...

    番茄西红柿 评论0 收藏0

发表评论

0条评论

Lemon_95

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<