资讯专栏INFORMATION COLUMN

node源码解析 -- Stream探究

gyl_coder / 2748人阅读

摘要:是消费数据的,从中获取数据,然后对得到的块数据进行处理,至于如何处理,就依赖于具体实现也就是的实现。也可以说是建立在的基础上。

1. 认识Stream

Stream的概念最早来源于Unix系统,其可以将一个大型系统拆分成一些小的组件,然后将这些小的组件可以很好地运行

TCP/IP协议中的TCP协议也用到了Stream的思想,进而可以进行流量控制、差错控制

在unix中通过 |来表示流;node中通过pipe方法

Stream可以认为数据就像管道一样,多次不断地被传递下去,而不是一次性全部传递给下游

2. node中的stream

在node stream中可以看到第一段的描述:

A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter

对上面一段话进行解析,可以得到如下几点:

Stream是Node中一个非常重要的概念,被大量对象实现,尤其是Node中的I/O操作

Stream是一个抽像的接口,一般不会直接使用,需要实现内部的某些抽象方法(例如_read、_write、_transform)

Stream是EventEmitter的子类,实际上Stream的数据传递内部依然是通过事件(data)来实现的

Stream分为四种:readable、writeable、Duplex、transform

3.Readable Stream 与 Writeable Stream 3.1 二者的关系

Readable Stream是提供数据的Stream,外部来源的数据均会存储到内部的buffer数组内缓存起来。

writeable Stream是消费数据的Stream,从readable stream中获取数据,然后对得到的chunk块数据进行处理,至于如何处理,就依赖于具体实现(也就是_write的实现)。

首先看看Readdable Streamwriteable stream二者之间的流动关系:

3.2 pipe的流程解析

stream内部是如何从readable stream流到writeable stream里面呢?有两种方法:

a) pipe 连接两个stream

先看一个简单地demo

var Read = require("stream").Readable;
var Write = require("stream").Writable;
var r = new Read();
var w = new Write();

r.push("hello ");
r.push("world!");
r.push(null)


w._write = function (chunk, ev, cb) {
    console.log(chunk.toString());
    cb();
}

r.pipe(w);

pipe是一种最简单直接的方法连接两个stream,内部实现了数据传递的整个过程,在开发的时候不需要关注内部数据的流动:

Readable.prototype.pipe = function (dest, pipeOpts) {
    var src = this;
    ...
    src.on("data", ondata);
    
    function ondata(chunk) {
        var ret = dest.write(chunk);
        if (false === ret) {
              debug("false write response, pause",
            src._readableState.awaitDrain);
              src._readableState.awaitDrain++;
              src.pause();
        }
    }
    ...
}

b) 事件data + 事件drain联合实现

var Read = require("stream").Readable;
var Write = require("stream").Writable;
var r = new Read();
var w = new Write();

r.push("hello ");
r.push("world!");
r.push(null)


w._write = function (chunk, ev, cb) {
    console.log(chunk.toString());
    cb();
}

r.on("data", function (chunk) {
    if (!w.write(chunk)) {
        r.pause();
    }
})

w.on("drain", function () {
    r.resume();
})

// hello
// world!
4 Readable Stream的模式 4.1 内部模式的实现

Readable Stream 存在两种模式(flowing mode 与 paused mode),这两种模式决定了chunk数据流动的方式---自动流动还是手工流动。那如何触发这两种模式呢:

flowing mode: 注册事件data、调用resume方法、调用pipe方法

paused mode: 调用pause方法(没有pipe方法)、移除data事件 && unpipe所有pipe

让我们再深入一些,看看里面具体是如何实现的:

// data事件触发flowing mode
Readable.prototype.on = function(ev, fn) {
    ...
    if (ev === "data" && false !== this._readableState.flowing) {
        this.resume();
      }
      ...
}

// resume触发flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
           debug("resume");
           state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法触发flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}

结论

两种方式取决于一个flowing字段:true --> flowing mode;false --> paused mode

三种方式最后均是通过resume方法,将state.flowing = true

4.2 两种模式的操作

a) paused mode

在paused mode下,需要手动地读取数据,并且可以直接指定读取数据的长度:

var Read = require("stream").Readable;
var r = new Read();

r.push("hello");
r.push("world");
r.push(null);

console.log("输出结果为: ", r.read(1).toString())
// 输出结果为: "h"

还可以通过监听事件readable,触发时手工读取chunk数据:

var Read = require("stream").Readable;
var r = new Read();

r.push("hello");
r.push("world");
r.push(null);

r.on("readable", function () {
    var chunk = r.read();
    console.log("get data by readable event: ", chunk.toString())
});

// get data by readable event:  hello world!

需要注意的是,一旦注册了readable事件,必须手工读取read数据,否则数据就会流失,看看内部实现:

function emitReadable_(stream) {
    debug("emit readable");
    stream.emit("readable");
    flow(stream);
}

function flow(stream) {
    var state = stream._readableState;
    debug("flow", state.flowing);
    if (state.flowing) {
           do {    
              var chunk = stream.read();
        } while (null !== chunk && state.flowing);
    }
}

Readable.prototype.read = function (n) {
    ...
    var res = fromList(n, state);
    
    if (!util.isNull(ret)) {
        this.emit("data", ret);
    }
    ...
}

flow方法直接read数据,将得到的数据通过事件data交付出去,然而此处没有注册data事件监控,因此,得到的chunk数据并没有交付给任何对象,这样数据就白白流失了,所以在触发emit("readable")时,需要提前read数据。

b) flowing mode

通过注册data、pipe、resume可以自动获取所需要的数据,看看内部实现:

// 事件data方式
var Read = require("stream").Readable;

var r = new Read();

r.push("hello ");
r.push("world!");
r.push(null)

r.on("data", function (chunk) {
    console.log("chunk :", chunk.toString())
})
// chunk : hello 
// chunk : world!
// 通过pipe方式
var r = new Read();

r.push("hello ");
r.push("world!");
r.push(null)

r.pipe(process.stdout)
// hello world!

c) 两种mode的总结

5. transform stream的实现

用过browserify的人都知道,browserify是一种基于stream的模块打包工具,里面存在browserify.prototype.transform(tr)方法,其中的tr就要求是transform stream,且browserify内部通过through2构建了很多tranform stream。也可以说browserify是建立在transform stream的基础上。那么具备readable、writeablestream的transform stream内部是如何工作的呢?

6. 自定义stream

自定义stream很简单,只要实现相应的内部待实现方法就可以了,具体来说:

readable stream: 实现_read方法来解决数据的获取问题

writeable stream: 实现_write方法来解决数据的去向问题

tranform stream: 实现_tranform方法来解决数据存放在buffer前的转换工作

// 自定义readable stream的实现
var Stream = require("stream");
var Read = Stream.Readable;
var util = require("util");

util.inherits(MyReadStream, Read);

function MyReadStream(data, opt) {
    Read.call(this, opt);
    this.data = data || [];
}
MyReadStream.prototype._read = function () {
    var _this = this;
    this.data.forEach(function (d) {
        _this.push(d);
    })
    this.push(null);
}

var data = ["aa", "bb", "cc"];
var r = new MyReadStream(data);

r.on("data", function (chunk) {
    console.log(chunk.toString());
})
7. 参考资料

stream-handbook

node-stream

iojs源码

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/85905.html

相关文章

  • 阅读gulp源码小结

    摘要:源码简介源码核心部分寥寥行。同时本身是直接继承于模块。写在末尾阅读代码的这一次,是我第一次阅读这种开源的模块化项目。深深的被震撼到了,认识到了模块化的巨大力量。就能完成非常复杂的事情,而不需要凡是亲力亲为,一行行代码,一个个小问题依次解决。 gulp源码简介 gulp源码核心部分寥寥60+行。但是通过这60+行代码,gulp给我们带来的确是前端自动化构建的便利。以往以为其源码肯定蛮复杂...

    Rocture 评论0 收藏0
  • 【Vue源码探究一】当我们引入Vue,我们引入了什么?

    摘要:源码版本构造器实例选项让我们用一段展示一下这三个概念其中的构造器实例实例名可以任意取,这里我们便于理解保持和文档一致选项即为传入构造器里的配置选项。其实构造器上也绑了不少好用的方法。 源码版本:2.0.5 构造器、实例、选项 让我们用一段demo展示一下这三个概念: //HTML {{ message }} //JS var vm = new Vue({ el: #app,...

    mengbo 评论0 收藏0
  • 通过源码解析 Node.js 中导流(pipe)的实现

    摘要:回调函数中检测该次写入是否被缓冲,若是,触发事件。若目标可写流表示该写入操作需要进行缓冲,则立刻将源可读流切换至暂停模式。监听源可读流的事件,相应地结束目标可写流。 在Node.js中,流(Stream)是其众多原生对象的基类,它对处理潜在的大文件提供了支持,也抽象了一些场景下的数据处理和传递。在它对外暴露的接口中,最为神奇的,莫过于导流(pipe)方法了。鉴于近期自己正在阅读Node...

    defcon 评论0 收藏0
  • through2原理解析

    摘要:的源码仅仅就多行,本质上就是对于原生的流进行的封装,先来看下。是一个双工流,既可读,也可写,但是与还是有着一些区别,的写和读可以说是没有任何的关联,是两个缓冲区和管道互补干扰,而将其输入和输出是存在相互关联的,中间做了处理。 写在前面 through2经常被用于处理node的stream,假如使用过gulp的话,对于这个包一定不会陌生,如: gulp.task(rewrite, () ...

    leap_frog 评论0 收藏0
  • Spring源码一(容器的基本实现1)

    摘要:下面跟踪代码到这个实现中看看是怎么做的在实例化的过程中,在构造函数中调用了其超类的构造函数,而在超类中对其所处换环境进行的判断,所谓的环境呢,事实上指得就是是通过,还是通过加载的上下文,这也就意味着不同方式加载可能存在某些不同。 前言 本文基于《Spring源码深度解析》学习, 《Spring源码深度解析》讲解的Spring版本低于Spring3.1,当前阅读的版本为Spring5.x...

    awokezhou 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<