资讯专栏INFORMATION COLUMN

流的剖析和实现

LeviDing / 360人阅读

摘要:开始读取位置结束读取位置包括结束位置如果为,则文件描述符不会被关闭,即使有错误。需要程序负责关闭它,并且确保没有文件描述符泄漏。

流的定义

流是抽象化的概念,形象生动的描述了数据的流动、变化。
具体来说,在node中流是处理数据的抽象接口,继承了EventEmitter,通过这个接口我们能够控制流的开关,流动的方向等等。
比较形象直观一点类似我们在linux上使用shell,通过管道,链接处理各个部分,下面是我写的一个命令,筛选出version并导出到文件中。

流的分类

Readable(可读流)

Writable(可写流)

Duplex(可读可写的流)

Transform(在读写过程中可以修改和变化的Duplex流)

流按照功能大致划分为以上四类,具体应用的话有很多场景,如下图所示(来源:参考链接2)

下面我根据流的分类,列举一些demo应用实例

Readable

可读流能接受各种数据源,例如控制台的输入,文件,字符串等等,就如介绍中所说是抽象接口,可以面向各种形式的输入,下面举几个例子。

文件流
require("fs").createReadStream("./1.txt",{
    encoding: "utf8"
}).on("data",(data) => {
    console.log(data)
})
// 输出  hello jsdt

说明 为什么要用流来读取,直接用fs.readFile岂不是更方便吗,因为readFile是整体操作,会将文件全部读到内存中在做处理,这样的话文件如果很大,程序就会很卡,甚至报错。

标准输入流
process.stdin.setEncoding("utf8");
process.stdin.on("data",(data) => {
    console.log("输出: "+ data)
})
node 运行code,然后输入 hello jsdt
输出: hello jsdt

说明 这个做acm的时候会用到,或者平时自己写一些交互式应用的时候

普通数据流
let {Readable} = require("stream")
let util = require("util")
class Test extends  Readable{
    constructor(){
        super()
        this.dataSource = 5
    }
    _read(){
        if(this.dataSource-->0){
            this.push(this.dataSource+"");
        }else{
            this.push(null);
        }
    }
}
let counter = new Test();
counter.on("data",function(data){
    console.log(data.toString())
});
输出:
4
3
2
1

说明 重写_read方法,自定义输入的逻辑,上面示例中是自己逻辑中产生的一个数据源。

Writable 文件流
let dataSource = "hello jsdt",i = 0;
(function(){
    let ws = require("fs").createWriteStream("./1.txt",{
        encoding: "utf8"
    })
    let flag = true;
    while(flag && i


说明 闭包自执行,通过流将数据写入到文件中,上面是输出结果。

自定义输出
let {Writable} = require("stream")
let arr = []
let ws = Writable({
    write(chunk,encoding,cb){
        arr.push(chunk)
        cb()
    }
})
for(let i = 1; i<= 3;i++){
  ws.write(""+i,"utf8",()=>{})
}
process.nextTick(function () {
    console.log(arr.toString())
})
//  输出 1,2,3

说明 上面重写了流的write方法,可以自定义写逻辑

Duplex
require("net").createServer(socket => {
    socket.on("data",data => {
        console.log("client message " + data);
        socket.write("server message " + "hello client ");
    })
}).listen(8080,() =>{})


说明 作为可写流一面socket可以向客户端发送信息,做为可读流一面可以监听data事件,收到客户端发送过信息

Transform
let t = require("stream").Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
// 输入abc
// 输出ABC

说明 上面使用转换流,实现了terminal上小写输入,对应大写输出的功能

流中数据分类

二进制模式

对象模式

在创建流的时候可以指定配置,objectMode默认为false,设为true切换到对象模式。二进制即buffer模式,可读或可写流都会将数据会缓存数据在buffer中。

流的剖析

通过上面的介绍我们明确了流的定义,并按照功能对流进行了分类,下面我进行下剖析,总的来说流的各种形态间转化传输底层都是二进制,具体到使用形态上有buffer,string等等。
首先详细说下可读流,可读流有两种模式,默认为paused模式。

flowing 按照初始化配置,自动读取数据,并通过观察者模式,直接将数据提供给订阅者

paused 显式调用流的read方法读取数据

其中如果我们想切换到流动模式可以通过监听data事件的方式、或者调用stream.resume()、stream.pipe() 这些方法。

可读流源码分析
// 可读流入口,根据配置返回一个可读流
fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};

// 实现原理是ReadStream.prototype.__proto__ = Readable.prototype,可以继承Readable上的一些方法
util.inherits(ReadStream, Readable);
fs.ReadStream = ReadStream;

function ReadStream(path, options) {
  // 非new方式调用,直接返回一个实例
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  options = copyObject(getOptions(options, {}));
  if (options.highWaterMark === undefined)
  // highWaterMark默认值为64k,设置了flow模式下缓冲区的大小
    options.highWaterMark = 64 * 1024;  

  Readable.call(this, options);

  handleError((this.path = getPathFromURL(path)));
  // 文件描述符,根据这个句柄找到文件
  this.fd = options.fd === undefined ? null : options.fd;
  // flags打开文件要做的操作,默认为"r"
  this.flags = options.flags === undefined ? "r" : options.flags;
  // 用于设置文件模式(权限和粘结位),仅限创建文件时。
  this.mode = options.mode === undefined ? 0o666 : options.mode;
  // 开始读取位置
  this.start = options.start;
  // 结束读取位置(!!!包括结束位置)
  this.end = options.end;
  /**
   * 如果 autoClose 为 false,则文件描述符不会被关闭,即使有错误。 
   * 需要程序负责关闭它,并且确保没有文件描述符泄漏。 
   * 如果 autoClose 被设置为 true(默认),则在 error 或 end 时,文件描述符会被自动关闭
   */
  this.autoClose = options.autoClose === undefined ? true : options.autoClose;
   this.pos = this.start;
   
  }
// 适合传入句柄的情况,例如fd: 0,这样就不是文件,而是控制台输入的数据了
  if (typeof this.fd !== "number")
    this.open();
  this.on("end", function() {
    if (this.autoClose) {
      this.destroy();
    }
  });
}

// 打开文件,并触发open事件,只有打开了才能读取,所以在回调中触发open事件,看下步操作
ReadStream.prototype.open = function() {
  var self = this;
  fs.open(this.path, this.flags, this.mode, function(er, fd) {
    self.fd = fd;
    self.emit("open", fd);
    //  start the flow of data.
    self.read();
  });
};
Readable.prototype.read = function(n) {
    // 当read(0)时,如果缓存中已有数据,则触发readable事件,相当于刷新下缓存。否则触发end事件
if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  //  若可读流已经被传入了终止符(null),且缓冲中没有遗留数据,则结束这个可读流
  if (n === 0 && state.ended) {
      if (state.length === 0)
        endReadable(this);
      return null;
    }
    //  若目前缓冲中的数据大小为空,或未超过设置的警戒线,则进行一次数据读取。
      if (state.length === 0 || state.length - n < state.highWaterMark) {
        doRead = true;
      }
        if (state.ended || state.reading) {
          doRead = false;
        } else if (doRead) {
          state.reading = true;
          state.sync = true;
          this._read(state.highWaterMark);
     }


}
ReadStream.prototype._read = function(n) {
  if (typeof this.fd !== "number") {
    // 防止重复绑定open事件,当文件打开且emit open事件,此时才会进行真正的读操作
    return this.once("open", function() {
      this._read(n);
    });
  }
 // 然后读数据的时候会计算实际读的数量
 function howMuchToRead(n, state) {
    //  如果读的数量超过highWaterMark,则重新计算highWaterMark
    if (n > state.highWaterMark)
      state.highWaterMark = computeNewHighWaterMark(n);
    if (n <= state.length)
      return n;
 }
  // 经过上面一系列的准备工作,下面开始真正的读操作咯
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
      if (bytesRead > 0) {
        this.bytesRead += bytesRead;
      }
      this.push(b);
  });
};

// 上面整个过程是paused的流程,其中flow模式又有所不同,如下所示
// 如果监听了data事件,则会调用this.resume(),开始流动模式
Readable.prototype.on = function(ev, fn) {
  const res = Stream.prototype.on.call(this, ev, fn);
  if (ev === "data") {
    //  Start flowing on next tick if stream isn"t explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  }
  }
// flow模式下 流内部自动触发data事件,循环读取数据
function flow(stream) {
  const state = stream._readableState;
  debug("flow", state.flowing);
  while (state.flowing && stream.read() !== null);
}
// 然后触发 data事件,循环发射数据
stream.emit("data", chunk);

总结 上面是可读流的源码分析,摘要了关键部分,下面在梳理一下,当通过ReadStream创建一个流的时候,默认会触发readable事件,进入暂停模式,此时内部维护的有一个缓冲区,在readable事件回调逻辑中进行read操作,首先会通过howMuchToRead方法计算实际读取的数量,如果现有数据小于highWaterMark,内部会进行this._read(state.highWaterMark)操作,其回调中会进行push操作,push在调用readableAddChunk将数据放到内部维护的缓存中,反之则从fromList中读取缓存中的数据,然后返回。而如果监听了data事件,代码中所示会调用this.resume(),将流状态设置为flowing模式,然后resume()->resume_()->flow()的调用顺序执行flow方法循环读取数据,触发data事件,完成数据的自动读取,然后发射给调用者,会不停的循环整个过程。上面比较值的注意一点的就是flow模式和paused模式区别,如果是flow模式在addChunk的时候,如下所示

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    stream.emit("data", chunk);
    stream.read(0);
  } 
}

会自动发射数据,不会走缓存,而paused模式会走一遍内部的缓存机制。
根据上面node源码的分析过程,下面图形化描述下整个流程。

自己实现的一个可读流

可写流源码分析
// 1:首先第一步根据createWriteStream传入参数进行初始化
// 2:调用写操作
Writable.prototype.write = function(chunk, encoding, cb) {
  if (state.ended)
   //在end继续写入会emit一个error事件
    writeAfterEnd(this, cb);
  else if (validChunk(this, state, chunk, cb)) {
  //在校验数据chunk合法的情况下才会进行后续的写逻辑
    state.pendingcb++;
    ret = writeOrBuffer(this, state, chunk, encoding, cb);
  }
return ret;
};

function writeOrBuffer(stream, state, chunk, encoding, cb) {
  chunk = decodeChunk(state, chunk, encoding);

  if (chunk instanceof Buffer)
    encoding = "buffer";
  var len = state.objectMode ? 1 : chunk.length;

  state.length += len;//实时更新缓冲区长度

  var ret = state.length < state.highWaterMark;//判断缓存区是否超过水位线(highWaterMark,不传默认16k,源码_stream_writeable.js--40行)设置
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked) {
 //如果此时处于写状态,将新添加的数据放到缓冲池链表尾部
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {
    //写入数据
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }
return ret;
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  if (writev)
    //一次写入多个数据块
    stream._writev(chunk, state.onwrite);
  else
  //一次写入一个数据块
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}
function onwrite(stream, er) {
    if (!finished &&
        !state.corked &&
        !state.bufferProcessing &&
        state.bufferedRequest) {
        //清空缓冲池 ,不为空,则循环执行 _write() 写入单个数据块
      clearBuffer(stream, state);
    }
  }
}
function clearBuffer(stream, state) {
    // 单个数据写入
    while (entry) {
      var chunk = entry.chunk;
      var encoding = entry.encoding;
      var cb = entry.callback;
      var len = state.objectMode ? 1 : chunk.length;
        //开启数据写操作
      doWrite(stream, state, false, len, chunk, encoding, cb);
      entry = entry.next;
    }
}

总结 上面是可写流源码分析,摘要了关键流程,首先根据传入参数进行初始化配置,然后用户调用write方法进行写入,写入前会判断一下是否超过水位线,超过触发drain事件,返回false,注意一点此时仍可以进行写入,返回false只是告诉你,已经满了,后需要不要写入还是靠用户根据这个返回值来控制。如果没超过,在写之前会先判断是否处于写状态,是的话将数据放到缓存中,反之会进行doWrite <-->clearBuffer这样的循环操作,一直到数据缓存中数据消耗完为止。清理完了之后,后续调用write的返回值ret为false,从而继续写,一直循环前面描述的整个过程,直到数据源写完为止。总的来说,因为可写流内部只有一个状态,复杂度低于可读流,整个过程还是比较清晰的,不在图形化流程。

自己实现的一个可写流

说明
node源码分析版本基于v8.9.4
参考资料
http://nodejs.cn/api/
https://medium.freecodecamp.o...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/107173.html

相关文章

  • [译]RxJS文档03——剖析Observable

    摘要:通过执行和可以向订阅者推送不同的通知。之后,执行过程可能被处理掉。当调用并得到观察者时,在中传入的函数将会被执行。每次执行都会触发一个单独针对当前的运行逻辑。通知不发出任何值,表示流的结束。 原文:http://reactivex.io/rxjs/manu... Rx.Observalbe.create()或者创建操作符,可以 创建(created) Observable流。Obser...

    netScorpion 评论0 收藏0
  • 高薪程序员&amp;面试题精讲系列22之说说Java的IO流,常用哪些IO流?

    摘要:一面试题及剖析今日面试题今天壹哥带各位复习一块可能会令初学者比较头疼的内容,起码当时让我很有些头疼的内容,那就是流。在这里壹哥会从两部分展开介绍流,即与流。除此之外尽量使用字节流。关闭此输入流并释放与流相关联的任何系统资源。 一. 面试题及剖析 1. 今日面试题 今天 壹哥 带各位复习一块可...

    fnngj 评论0 收藏0
  • 《Java8实战》-第四章读书笔记(引入流Stream)

    摘要:内部迭代与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。流只能遍历一次请注意,和迭代器类似,流只能遍历一次。 流(Stream) 流是什么 流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码了!我会在后面的笔记中...

    _ivan 评论0 收藏0
  • Node.js 指南(HTTP事务的剖析

    摘要:为了处理请求流上的错误,我们将错误记录到并发送状态码以指示,但是,在实际应用程序中,我们需要检查错误以确定正确的状态码和消息是什么,与通常的错误一样,你应该查阅错误文档。通过对象发送状态码和数据。 HTTP事务的剖析 本指南的目的是让你充分了解Node.js HTTP处理的过程,我们假设你在一般意义上知道HTTP请求的工作方式,无论语言或编程环境如何,我们还假设你对Node.js Ev...

    ASCH 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<