摘要:上游水源通过里中的方法流入水电站中。当在接收数据中出现错误时发出。暂停可读流,不再发出事件恢复可读流,继续发出事件把这个可读流的输出传递给指定的流,两个流组成一个管道。
题外话
该文章整合了多篇网络文章(整合之处已设置超链接,可点击直接了解原文),目的仅仅是为了和大伙分享,更加通俗易懂的了解流的各个流程的初始。本人也是node的初学菜鸟,有描述错误或误人子弟的地方多请大神们多多指出。
readable 我们先来安利一些思路,方便理清楚逻辑:)。事件 查看原文读缓冲区(readable buffer):这里的读是个形容词,是指可读流临时存放data(只能是字符串或者Buffer,不能是数字)的缓冲区。(读缓冲区就像一个水电站一样,感觉这样描述比较好理解flowing、paused模式)
flowing模式:即流动模式,就像打开水电站的水闸一样,上游的水和下游完完全全连通直到上游来源的数据耗尽。
paused模式:即暂停模式,就像水电站的水闸在你指定的时候(使用stream.read())才会打开。不过,当你使用read()打开水闸的时候是一个超自然现象---水电站里的水瞬间被抽干,上游的水还没来得及填充水电站。然后自动关闭水闸,等待你的下一次“惠顾“read()。
_read:上游水源通过_read里中的push、unshift方法流入水电站中。
函数 查看原文readable:在数据块可以从流中读取的时候发出。它对应的处理器没有参数,可以在处理器里调用read([size])方法读取数据。
data:有数据可读时发出。它对应的处理器有一个参数,代表数据。如果你只想快快地读取一个流的数据,给data关联一个处理器是最方便的办法。处理器的参数是Buffer对象,如果你调用了Readable的setEncoding(encoding)方法,处理器的参数就是String对象。
end:当数据被读完时发出。对应的处理器没有参数。
close:当底层的资源,如文件,已关闭时发出。不是所有的Readable流都会发出这个事件。对应的处理器没有参数。
error:当在接收数据中出现错误时发出。对应的处理器参数是Error的实例,它的message属性描述了错误原因,stack属性保存了发生错误时的堆栈信息。
流动模式和暂停模式切换 查看原文read([size]):如果你给read方法传递了一个大小作为参数,那它会返回指定数量的数据,如果数据不足,就会返回null。如果你不给read方法传参,它会返回内部缓冲区里的所有数据,如果没有数据,会返回null,此时有可能说明遇到了文件末尾。read返回的数据可能是Buffer对象,也可能是String对象。
setEncoding(encoding):给流设置一个编码格式,用于解码读到的数据。调用此方法后,read([size])方法返回String对象。
pause():暂停可读流,不再发出data事件
resume():恢复可读流,继续发出data事件
pipe(destination,[options]):把这个可读流的输出传递给destination指定的Writable流,两个流组成一个管道。options是一个JS对象,这个对象有一个布尔类型的end属性,默认值为true,当end为true时,Readable结束时自动结束Writable。注意,我们可以把一个Readable与若干Writable连在一起,组成多个管道,每一个Writable都能得到同样的数据。这个方法返回destination,如果destination本身又是Readable流,就可以级联调用pipe(比如我们在使用gzip压缩、解压缩时就会这样,马上会讲到)。
unpipe([destination]):端口与指定destination的管道。不传递destination时,断开与这个可读流连在一起的所有管道。
通过添加 data 事件监听器来启动数据监听
调用 resume() 方法启动数据流
调用 pipe() 方法将数据转接到另一个 可写流
触发准备数据(_read)的方法在流没有 pipe() 时,调用 pause() 方法可以将流暂停
pipe() 时,需要移除所有 data 事件的监听,再调用 unpipe() 方法
工作流程 查看原文data listener
readable listener
read()——如果当前缓冲区为空,或者缓冲区并未超出我们设定的最大值,那么就可以继续准备数据;如果此时正在准备数据(_read())或者已经结束读取(push(null)),那么就放弃准备数据。
1.在paused模式下则读取全部缓冲区的长度;若读取的字节数(n)大于设置的缓冲区最大值,则适当扩大缓冲区的大小(默认为16k,最大为8m);若读取的长度大于当前缓冲区的大小,设置needReadable属性并准备数据等待下一次读取。
2.如果当前缓冲区为空,或者缓冲区并未超出我们设定的最大值,那么就可以继续准备数据;如果此时正在准备数据(_read())或者已经结束读取(push(null)),那么就放弃准备数据。
3.针对这个私有方法_read,文档上有特殊说明,自定义的Readable实现类需要实现这个方法,在该方法中手动添加数据到Readable对象的读缓冲区,然后进行Readable的读取。可以理解为_read函数为读取数据前的准备工作(准备数据),针对的是流的实现者而言。
1.对于处在flowing模式下的读取,每次只读缓冲区中第一个buffer的长度
2.针对这个私有方法_read,文档上有特殊说明,自定义的Readable实现类需要实现这个方法,在该方法中手动添加数据到Readable对象的读缓冲区,然后进行Readable的读取。可以理解为_read函数为读取数据前的准备工作(准备数据),针对的是流的实现者而言。
实例//这是一个将存放多条json字符串的txt文件读取成json的例子 const stream = require("stream"); const fs = require("fs"); const util = require("util"); function JSONLineReader(source) { stream.Readable.call(this); this._source = source; this._foundLineEnd = false; this._buffer = ""; source.on("readable", function() {//监听source什么时候准备好,那么我们就可以用read()或则readable listener去触发JSONLineReader的_read方法 this.read(); // this.on("readable", function(data) { // console.log("readable"); // }); }.bind(this)) } util.inherits(JSONLineReader, stream.Readable); JSONLineReader.prototype._read = function(size) { var chunk; var line; var lineIndex; var result; if (this._buffer.length === 0) { chunk = this._source.read(); this._buffer += chunk; //一次就拿完 只是看什么时候push null } lineIndex = this._buffer.indexOf(" "); if (lineIndex !== -1) { line = this._buffer.slice(0, lineIndex); if (line) { result = JSON.parse(line); this._buffer = this._buffer.slice(lineIndex + 1); this.emit("object", result);util.inspect(result)) this.push(util.inspect(result)); } else { this._buffer = this._buffer.slice(1); } } } let input = fs.createReadStream(__dirname + "/json-lines.txt", { encoding: "utf8" }); var jsonLineReader = new JSONLineReader(input); jsonLineReader.on("object", function(obj) { console.log("pos:", obj); }) /*json-lines.txt {"success":false,"code":501} {"success":true,"code":202} {"success":false,"code":503} {"success":true,"code":204} {"success":false,"code":505} {"success":true,"code":206} {"success":false,"code":507} {"success":true,"code":208} {"success":false,"code":509} */
let stream = require("stream"); let util = require("util"); util.inherits(flowingReadableDemo, stream.Readable); function flowingReadableDemo(opt) { stream.Readable.call(this, opt); this.quotes = ["yessdasdsa", "noasdasdas", "maybe"]; this._index = 0; } flowingReadableDemo.prototype._read = function() { if (this._index >= this.quotes.length) { this.push(null); } else { this.push(this.quotes[this._index]); this._index += 1; } }; let r = new flowingReadableDemo(); r.on("data", function(data) { console.log("Callback read: " + data.toString()); // flowing状态下,我们无需执行read,仅需要设置data事件处理函数或者设定导流目标pipe }); r.on("end", function(data) { console.log("No more answers."); });
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/87390.html
摘要:内部架构上图表示一个实例的组成部分部分缓冲数组内部函数部分缓冲链表内部函数实例必须实现的内部函数以及系统提供的回调函数。有三个参数,第一个为待处理的数据,第二个为编码,第三个为回调函数。 Transform流特性 在开发中直接接触Transform流的情况不是很多,往往是使用相对成熟的模块或者封装的API来完成流的处理,最为特殊的莫过于through2模块和gulp流操作。那么,Tra...
摘要:回调函数中检测该次写入是否被缓冲,若是,触发事件。若目标可写流表示该写入操作需要进行缓冲,则立刻将源可读流切换至暂停模式。监听源可读流的事件,相应地结束目标可写流。 在Node.js中,流(Stream)是其众多原生对象的基类,它对处理潜在的大文件提供了支持,也抽象了一些场景下的数据处理和传递。在它对外暴露的接口中,最为神奇的,莫过于导流(pipe)方法了。鉴于近期自己正在阅读Node...
摘要:是消费数据的,从中获取数据,然后对得到的块数据进行处理,至于如何处理,就依赖于具体实现也就是的实现。也可以说是建立在的基础上。 1. 认识Stream Stream的概念最早来源于Unix系统,其可以将一个大型系统拆分成一些小的组件,然后将这些小的组件可以很好地运行 TCP/IP协议中的TCP协议也用到了Stream的思想,进而可以进行流量控制、差错控制 在unix中通过 |来表示流...
摘要:方法也可以接收一个参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。读取数据大部分情况下我们只要简单的使用方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许直接从可读流中读取数据更有用。 介绍本文介绍了使用 node.js streams 开发程序的基本方法。 We should have some ways of connecting programs ...
摘要:事件的触发频次同样是由实现者决定,譬如在进行文件读取时,可能每行都会触发一次而在请求处理时,可能数的数据才会触发一次。如果有参数传入,它会让可读流停止流向某个特定的目的地,否则,它会移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文节选自 Node.js Chea...
阅读 4641·2021-10-25 09:48
阅读 3219·2021-09-07 09:59
阅读 2202·2021-09-06 15:01
阅读 2703·2021-09-02 15:21
阅读 2739·2019-08-30 14:14
阅读 2192·2019-08-29 13:59
阅读 2524·2019-08-29 11:02
阅读 2542·2019-08-26 13:33