资讯专栏INFORMATION COLUMN

初探Node中的stream

learn_shifeng / 2089人阅读

摘要:则会立马再次读取填满最高水位线可写流这个方法向底层系统写入数据,并在数据处理完毕后调用所给的回调。返回值表示你是否应该继续立即写入。最好的办法是等待事件后,再写入数据。缓存区未满写入方法是同步的,但是写入文件的过程异步的。

Stream流有以下四种类型:

Readable - 可读操作

Writable - 可写操作

Duplex - 可读可写操作

Transform - 操作被写入数据,然后读出结果

可读流(Readable stream)

可读流(Readable stream)接口是对你正在读取的数据的来源的抽象。换句话说,数据来来自可读流(Readable stream)不会分发数据,直到你表明准备就绪。
可读流(Readable stream) 有2种模式: 流动模式(flowing mode) 和 暂停模式(paused mode). 流动模式(flowing mode)时,尽快的从底层系统读取数据并提供给你的程序。 暂停模式(paused mode)时, 你必须明确的调用 stream.read() 来读取数据。 暂停模式(paused mode) 是默认模式。
可以通过下面几个方法,将流切换到流动模式(flowing mode)。

let fs = require("fs");
/**
 * 所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:
 监听 "data" 事件
 调用 stream.resume() 方法
 调用 stream.pipe() 方法将数据发送到 Writable
 */
let rs = fs.createReadStream("./1.txt",{
    highWaterMark:3
});
/*
269
 stream.emit("data", chunk);
    stream.read(0);
rs.on("data",function (data) {
    console.log(data);
});
rs.on("end",function () {
    console.log("end");
});*/
//当你监听 readable事件的时候,会进入暂停模式
//当监听readable事件的时候,可读流会马上去向底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
//self.read(0); 只填充缓存,但是并不会发射data事件,但是会发射stream.emit("readable");事件
//this._read(state.highWaterMark); 每次调用底层的方法读取的时候是读取3个字节
rs.on("readable",function(){
    //length就是指得缓存区数据的大小
    // state.length +=  chunk.length;==3
    console.log(rs._readableState.length);
    //read如果不加参数表示读取整个缓存区数据
    //读取一个字段,如果可读流发现你要读的字节小于等于缓存字节大小,则直接返回
    let ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);
   /* ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);*/
    //当你读完指定的字节后,如果可读流发现剩下的字节已经比最高水位线小了。则会立马再次读取填满 最高水位线
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});
可写流(Writable stream )

这个方法向底层系统写入数据,并在数据处理完毕后调用所给的回调。返回值表示你是否应该继续立即写入。如果数据要缓存在内部,将会返回false。否则返回 true。返回值仅供参考。即使返回 false,你也可能继续写。但是写会缓存在内存里,所以不要做的太过分。最好的办法是等待drain 事件后,再写入数据。

let fs = require("fs");
let ws = fs.createWriteStream("2.txt",{
    flags:"w",
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//缓存区未满
    //写入方法是同步的,但是写入文件的过程 异步的。在真正写入文件后还会执行我们的回调函数
 while(flag && count>0){
     console.log("before",count);
     flag = ws.write((count)+"","utf8",(function (i) {
         return ()=>console.log("after",i);
     })(count));
     count--;
 }
}
write();//987
//监听缓存区清空事件
ws.on("drain",function () {
    console.log("drain");
    write();//654 321
});
ws.on("error",function (err) {
    console.log(err);
});
//如果已经不再需要写入了,可以调用end方法关闭写入流,一旦调用end方法之后则不能再写入
ws.end();
//write after end
//
ws.write("x");
双工流(Duplex streams)

双工流(Duplex streams)是同时实现了 Readable and Writable 接口。用法详见下文

let {Duplex} = require("stream");
let index = 0;
let s = Duplex({
    read(){
        if(index++<3)
          this.push("a"); 
          else 
       this.push(null);   
    },
    write(chunk,encoding,cb){
       console.log(chunk.toString().toUpperCase());
       cb();
    }
});
//process.stdin 标准输入流
//proces.stdout标准输出流
process.stdin.pipe(s).pipe(process.stdout);
转换流(Transform streams)

它的输出是从输入计算得来。 它实现了Readable 和 Writable 接口. 用法详见下文.

let {Transform}  = require("stream");
//转换流是实现数据转换的
let t = Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
let {Transform} = require("stream");
let fs = require("fs");
let rs = fs.createReadStream("./user.json");
//普通流里的放的是Buffer,对象流里放的对象
let toJSON = Transform({
    readableObjectMode:true,//就可以向可读流里放对象
    transform(chunk,encoding,cb){
        //向可读流里的缓存区里放
      this.push(JSON.parse(chunk.toString()));
    }
});
let outJSON = Transform({
    writableObjectMode:true,//就可以向可读流里放对象
    transform(chunk,encoding,cb){
      console.log(chunk);
      cb();
    }
});
rs.pipe(toJSON).pipe(outJSON);

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/107185.html

相关文章

  • PHP socket初探 --- 一些零碎细节的拾漏补缺

    摘要:原文前面可以说是弄了一系列的和多进程的一大坨内容,知识浅显代码粗暴风格简陋,总的说来,还是差了一些细节。今天,就一些漏掉的细节补充一下。最后,我补充一句是同步的,而不是异步。 原文:https://t.ti-node.com/thread/... 前面可以说是弄了一系列的php socket和多进程的一大坨内容,知识浅显、代码粗暴、风格简陋,总的说来,还是差了一些细节。今天,就一些漏...

    chengjianhua 评论0 收藏0
  • PHP socket初探 --- 先从一个简单的socket服务器开始

    摘要:原文地址的中文名字叫做套接字,这种东西就是对的封装。运行结果如下简单解析一下上述代码来说明一下服务器的流程首先,根据协议族或地址族套接字类型以及具体的的某个协议来创建一个。很容易受到攻击,造成拒绝服务。 [原文地址:https://blog.ti-node.com/blog...] socket的中文名字叫做套接字,这种东西就是对TCP/IP的封装。现实中的网络实际上只有四层而已,从上...

    miguel.jiang 评论0 收藏0
  • PHP socket初探 --- select系统调用

    摘要:原文地址在初探先从一个简单的服务器开始中依次讲解了三个逐渐进步的服务器只能服务于一个客户端的服务器利用可以服务于多个客户端的额服务器利用预派生进程服务于多个客户端的服务器最后一种服务器的进程模型基本上的大概原理其实跟我们常用的是非常 [原文地址:https://blog.ti-node.com/blog...] 在<PHP socket初探 --- 先从一个简单的socket服务器开始...

    springDevBird 评论0 收藏0
  • Hola~ 一款基于Electron的聊天软件

    摘要:前言本项目旨在从零到壹,制作一款界面精美的聊天软件。因为本人是开发,设计功底欠缺,所以软件设计的有点丑,如果有大神有更好的,欢迎。 Hola 前言 本项目旨在从零到壹,制作一款界面精美的聊天软件。 Github 地址因为已工作,所以可能没有多少时间来继续跟进这个项目了,项目可优化的点已在下文列出,欢迎大家 Fork 或 Star。 ps: 征 logo 一枚。因为本人是开发,设计功底...

    Kaede 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<