资讯专栏INFORMATION COLUMN

深入nodejs中流(stream)的理解

tianyu / 2982人阅读

摘要:等文件一旦打开,立刻执行写入操作发射一个缓存区清空的事件自定义可写流为了实现可写流,我们需要使用流模块中的构造函数。

流的基本概念及理解
流是一种数据传输手段,是有顺序的,有起点和终点,比如你要把数据从一个地方传到另外一个地方
流非常重要,gulp,webpack,HTTP里的请求和响应,http里的socket都是流,包括后面压缩,加密等

流为什么这么好用还这么重要呢?

因为有时候我们不关心文件的主体内容,只关心能不能取到数据,取到数据之后怎么进行处理

对于小型的文本文件,我们可以把文件内容全部读入内存,然后再写入文件,比如grunt-file-copy

对于体积较大的二进制文件,比如音频、视频文件,动辄几个GB大小,如果使用这种方法,很容易使内存“爆仓”。

理想的方法应该是读一部分,写一部分,不管文件有多大,只要时间允许,总会处理完成,这里就需要用到流的概念

流是一个抽象接口,被Node中很多对象所实现,比如HTTP服务器request和response对象都是流

Node.js 中有四种基本的流类型:

Readable - 可读的流 (例如 fs.createReadStream()).

Writable - 可写的流 (例如 fs.createWriteStream()).

Duplex - 可读写的流 (例如 net.Socket).

Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

可以通过 require("stream") 加载 Stream 基类。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类
Readable streams可读流
可读流(Readable streams)是对提供数据的 源头(source)的抽象
可读流的例子包括:

HTTP responses, on the client :客户端请求

HTTP requests, on the server :服务端请求

fs read streams :读文件

zlib streams :压缩

crypto streams :加密

TCP sockets :TCP协议

child process stdout and stderr :子进程标准输出和错误输出

process.stdin :标准输入

所有的 Readable 都实现了 stream.Readable 类定义的接口

通过流读取数据

用Readable创建对象readable后,便得到了一个可读流

如果实现_read方法,就将流连接到一个底层数据源

流通过调用_read向底层请求数据,底层再调用流的push方法将需要的数据传递过来

当readable连接了数据源后,下游便可以调用readable.read(n)向流请求数据,同时监听readable的data事件来接收取到的数据

下面简单举个可读流的例子:

监听可读流的data事件,当你一旦开始监听data事件的时候,流就可以读文件的内容并且发射data,读一点发射一点读一点发射一点

默认情况下,当你监听data事件之后,会不停的读数据,然后触发data事件,触发完data事件后再次读数据

读的时候不是把文件整体内容读出来再发射出来的,而且设置一个缓冲区,大小默认是64K,比如文件是128K,先读64K发射出来,再读64K在发射出来,会发射两次

缓冲区的大小可以通过highWaterMark来设置

let fs = require("fs");
//通过创建一个可读流
let rs = fs.createReadStream("./1.txt",{
    flags:"r",//我们要对文件进行何种操作
    mode:0o666,//权限位
    encoding:"utf8",//不传默认为buffer,显示为字符串
    start:3,//从索引为3的位置开始读
    //这是我的见过唯一一个包括结束索引的
    end:8,//读到索引为8结束
    highWaterMark:3//缓冲区大小
});
rs.on("open",function () {
    console.log("文件打开");
});
rs.setEncoding("utf8");//显示为字符串
//希望流有一个暂停和恢复触发的机制
rs.on("data",function (data) {
    console.log(data);
    rs.pause();//暂停读取和发射data事件
    setTimeout(function(){
        rs.resume();//恢复读取并触发data事件
    },2000);
});
//如果读取文件出错了,会触发error事件
rs.on("error",function () {
    console.log("error");
});
//如果文件的内容读完了,会触发end事件
rs.on("end",function () {
    console.log("读完了");
});
rs.on("close",function () {
    console.log("文件关闭");
});

/**
文件打开
334
455
读完了
文件关闭
**/
可读流的简单实现
let fs = require("fs");
let ReadStream = require("./ReadStream");
let rs = ReadStream("./1.txt", {
    flags: "r",
    encoding: "utf8",
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on("open", function () {
    console.log("open");
});
rs.on("data", function (data) {
    console.log(data);
});
rs.on("end", function () {
    console.log("end");
});
rs.on("close", function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require("fs");
let EventEmitter = require("events");

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || "r";
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on("end", function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on("newListener", (type) => {
            if (type == "data") {
                this.flowing = true;
                this.read();
            }
            if (type == "readable") {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit("error", err);
                }
            }
            this.fd = fd;
            this.emit("open");
        });
    }

    read(n) {
        if (typeof this.fd != "number") {
            return this.once("open", () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit("readable");
                        }

                        this.emit("end");
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit("readable");
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit("readable");
                    }
                    return this.emit("end");
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit("close");
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on("data", (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on("drain", () => {
            this.resume();
        });
        this.on("end", () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;
自定义可读流
为了实现可读流,引用Readable接口并用它构造新对象

我们可以直接把供使用的数据push出去。

当push一个null对象就意味着我们想发出信号——这个流没有更多数据了

var stream = require("stream");
var util = require("util");
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+"");
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on("data", function(data){
    console.log("读到数据: " + data.toString());//no maybe
});
counter.on("end", function(data){
    console.log("读完了");
});
可读流的两种模式
Readable Stream 存在两种模式(flowing mode 与 paused mode),这两种模式决定了chunk数据流动的方式---自动流动还是手工流动。那如何触发这两种模式呢:

flowing mode: 注册事件data、调用resume方法、调用pipe方法

paused mode: 调用pause方法(没有pipe方法)、移除data事件 && unpipe所有pipe

如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 "data" 事件,或是取消了 "data" 事件监听,就有可能出现这种情况

可读流的三种状态

在任意时刻,任意可读流应确切处于下面三种状态之一:

readable._readableState.flowing = null

readable._readableState.flowing = false

readable._readableState.flowing = true

两种模式取决于可读流flowing状态:

若为true : flowing mode;

若为false : paused mode

flowing mode

通过注册data、pipe、resume可以自动获取所需要的数据,我们来看下源码的实现
// data事件触发flowing mode
 if (ev === "data") {
    // Start flowing on next tick if stream isn"t explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === "readable") {
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }

// resume触发flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
        debug("resume");
        state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法触发flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}
flowing mode的三种方法最后均是通过resume方法,将状态变为true:state.flowing = true

paused mode

在paused mode下,需要手动地读取数据,并且可以直接指定读取数据的长度
可以通过监听事件readable,触发时手工读取chunk数据:

当你监听 readable事件的时候,会进入暂停模式

当监听readable事件的时候,可读流会马上去向底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;

self.read(0); 只填充缓存,但是并不会发射data事件,但是会发射stream.emit("readable");事件

this._read(state.highWaterMark); 每次调用底层的方法读取的时候是读取3个字节

let fs = require("fs");
let rs = fs.createReadStream("./1.txt",{
    highWaterMark:3
});
rs.on("readable",function(){
    console.log(rs._readableState.length);
    //read如果不加参数表示读取整个缓存区数据
    //读取一个字段,如果可读流发现你要读的字节小于等于缓存字节大小,则直接返回
    let chunk = rs.read(1);
    console.log(chunk);
    console.log(rs._readableState.length);
    //当你读完指定的字节后,如果可读流发现剩下的字节已经比最高水位线小了。则会立马再次读取填满 最高水位线
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});
注意:一旦注册了readable事件,必须手工读取read数据,否则数据就会流失,我们来看下源码的实现
function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug("emitReadable", state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  var state = stream._readableState;
  debug("emit readable");
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit("readable");
  }
  state.needReadable = !state.flowing && !state.ended;
  flow(stream);
}

function flow(stream) {
  const state = stream._readableState;
  debug("flow", state.flowing);
  while (state.flowing && stream.read() !== null);
}

function endReadable(stream) {
  var state = stream._readableState;
  debug("endReadable", state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

Readable.prototype.read = function(n) {
  debug("read", n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;
  if (n !== 0)
    state.emittedReadable = false;
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug("read: emitReadable", state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }
flow方法直接read数据,将得到的数据通过事件data交付出去,然而此处没有注册data事件监控,因此,得到的chunk数据并没有交付给任何对象,这样数据就白白流失了,所以在触发emit("readable")时,需要提前read数据
Writable streams可写流
可写流是对数据写入"目的地"的一种抽象
Writable:可写流的例子包括了:

HTTP requests, on the client 客户端请求

HTTP responses, on the server 服务器响应

fs write streams 文件

zlib streams 压缩

crypto streams 加密

TCP sockets TCP服务器

child process stdin 子进程标准输入

process.stdout, process.stderr 标准输出,错误输出

下面举个可写流的简单例子

当你往可写流里写数据的时候,不是会立刻写入文件的,而是会很写入缓存区,缓存区的大小就是highWaterMark,默认值是16K。然后等缓存区满了之后再次真正的写入文件里

let fs = require("fs");
let ws = fs.createWriteStream("./2.txt",{
   flags:"w",
   mode:0o666,
   start:3,
   highWaterMark:3//默认是16K
});

如果缓存区已满 ,返回false,如果缓存区未满,返回true

如果能接着写,返回true,如果不能接着写,返回false

按理说如果返回了false,就不能再往里面写了,但是如果你真写了,如果也不会丢失,会缓存在内存里。等缓存区清空之后再从内存里读出来

let flag = ws.write("1");
console.log(flag);//true
flag =ws.write("2");
console.log(flag);//true
flag =ws.write("3");
console.log(flag);//false
flag =ws.write("4");
console.log(flag);//false

"drain" 事件

如果调用 stream.write(chunk) 方法返回 false,流将在适当的时机触发 "drain" 事件,这时才可以继续向流中写入数据

当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 "drain" 事件就会被触发

建议, 一旦 write() 返回 false, 在 "drain" 事件触发前, 不能写入任何数据块

举个简单的例子说明一下:

let fs = require("fs");
let ws = fs.createWriteStream("2.txt",{
    flags:"w",
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//缓存区未满
    //写入方法是同步的,但是写入文件的过程是异步的。在真正写入文件后还会执行我们的回调函数
 while(flag && count>0){
     console.log("before",count);
     flag = ws.write((count)+"","utf8",(function (i) {
         return ()=>console.log("after",i);
     })(count));
     count--;
 }
}
write();//987
//监听缓存区清空事件
ws.on("drain",function () {
    console.log("drain");
    write();//654 321
});
ws.on("error",function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
如果已经不再需要写入了,可以调用end方法关闭写入流,一旦调用end方法之后则不能再写入
比如在ws.end();后写ws.write("x");,会报错write after end

"pipe"事件

linux精典的管道的概念,前者的输出是后者的输入

pipe是一种最简单直接的方法连接两个stream,内部实现了数据传递的整个过程,在开发的时候不需要关注内部数据的流动

这个方法从可读流拉取所有数据, 并将数据写入到提供的目标中

自动管理流量,将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存

默认情况下,当源数据流触发 end的时候调用end(),所以写入数据的目标不可再写。传 { end:false }作为options,可以保持目标流打开状态

pipe方法的原理

var fs = require("fs");
var ws = fs.createWriteStream("./2.txt");
var rs = fs.createReadStream("./1.txt");
rs.on("data", function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on("drain", function () {
    rs.resume();
});
rs.on("end", function () {
    ws.end();
});
下面举个简单的例子说明一下pipe的用法:
let fs = require("fs");
let rs = fs.createReadStream("./1.txt",{
  highWaterMark:3
});
let ws = fs.createWriteStream("./2.txt",{
    highWaterMark:3
});
rs.pipe(ws);
//移除目标可写流
rs.unpipe(ws);

当监听可读流data事件的时候会触发回调函数的执行

可以实现数据的生产者和消费者速度的均衡

rs.on("data",function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});

监听可写流缓存区清空事件,当所有要写入的数据写入完成后,接着恢复从可读流里读取并触发data事件

ws.on("drain",function () {
    console.log("drain");
    rs.resume();
});

unpipe

readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离

如果写入的目标没有传入, 则所有绑定的流都会被分离

如果指定了写入的目标,但是没有绑定流,则什么事情都不会发生

简单距离说明下unpipe的用法:
let fs = require("fs");
var from = fs.createReadStream("./1.txt");
var to = fs.createWriteStream("./2.txt");
from.pipe(to);
setTimeout(() => {
console.log("关闭向2.txt的写入");
from.unpipe(writable);
console.log("手工关闭文件流");
to.end();
}, 1000);
pipe的简单实现
let fs = require("fs");
let ReadStream = require("./ReadStream");
let rs = ReadStream("./1.txt", {
    flags: "r",
    encoding: "utf8",
    highWaterMark: 3
});
let FileWriteStream = require("./WriteStream");
let ws = FileWriteStream("./2.txt",{
    flags:"w",
    encoding:"utf8",
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on("data", (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on("drain", ()=>{
        this.resume();
    });
    this.on("end", ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
自定义管道流
const stream = require("stream")

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log("push", ++index)
            this.push(index+"");
        })
    }
})
const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log("写入:", chunk.toString())
    }
})
readable.pipe(writable);
可写流的简单实现
let fs = require("fs");
 let FileWriteStream = require("./FileWriteStream");
 let ws = FileWriteStream("./2.txt",{
     flags:"w",
     encoding:"utf8",
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1","utf8",(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on("drain",()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let EventEmitter = require("events");
let util = require("util");
let fs = require("fs");
util.inherits(WriteStream, EventEmitter);

function WriteStream(path, options) {
    EventEmitter.call(this);
    if (!(this instanceof WriteStream)) {
        return new WriteStream(path, options);
    }
    this.path = path;
    this.fd = options.fd;
    this.encoding = options.encoding||"utf8";
    this.flags = options.flags || "w";
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.pos = this.start;//开始写入的索引位置
    this.open();//打开文件进行操作
    this.writing = false;//没有在写入过程 中
    this.buffers = [];
    this.highWaterMark = options.highWaterMark||16*1024;
    //如果监听到end事件,而且要求自动关闭的话则关闭文件
    this.on("end", function () {
        if (this.autoClose) {
            this.destroy()
        }
    });
}
WriteStream.prototype.close = function(){
    fs.close(this.fd,(err)=>{
        if(err)
            this.emit("error",err);
    });
}
WriteStream.prototype.open = function () {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err)
            return this.emit("error", err);
        this.fd = fd;//把文件描述符赋给当前实例的fd属性
        //发射open事件
        this.emit("open", fd);
    });
}
/**
 * 会判断当前是后台是否在写入过程中,如果在写入过程中,则把这个数据放在待处理的缓存中,如果不在写入过程中,可以直接写。
 */
WriteStream.prototype.write = function (chunk, encoding, cb) {
    chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);

    //先把数据放在缓存里
    this.buffers.push({
        chunk,
        encoding,
        cb
    });

    let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
    //只有当缓存区写满了,那么清空缓存区的时候才会发射drain事件,否则 不发放
    this.needDrain = isFull;
    //如果说文件还没有打开,则把写入的方法压入open事件的监听函数。等文件一旦打开,立刻执行写入操作
    if (typeof this.fd !== "number") {
         this.once("open", () => {
            this._write();
        });
        return !isFull;
    }else{
        if(!this.writing){
            setImmediate(()=>{
                this._write();
                this.writing = true;
            });
        }

        return !isFull;
    }
}
WriteStream.prototype._write = function () {
    let part = this.buffers.shift();
    if (part) {
        fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
            if(err)return this.emit("error",err);
            part.cb && part.cb();
            this._write();
        });
    }else{
        //发射一个缓存区清空的事件
        this.emit("drain");
        this.writing = false;
    }
}
module.exports = WriteStream;
自定义可写流
为了实现可写流,我们需要使用流模块中的Writable构造函数。 我们只需给Writable构造函数传递一些选项并创建一个对象。唯一需要的选项是write函数,该函数揭露数据块要往哪里写

chunk通常是一个buffer,除非我们配置不同的流。

encoding是在特定情况下需要的参数,通常我们可以忽略它。

callback是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数

var stream = require("stream");
var util = require("util");
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString("utf8"));
        console.log("增加: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("项目:" + i, "utf8");
}
w.end("结束写入",function(){
    console.log(stock);
});
Duplex streams可读写的流(双工流)
Duplex 流是同时实现了 Readable 和 Writable 接口的流
双工流的可读性和可写性操作完全独立于彼此,这仅仅是将两个特性组合成一个对象

Duplex 流的实例包括了:

TCP sockets

zlib streams

crypto streams

下面简单实现双工流:
const {Duplex} = require("stream");
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+"");
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform streams转换流
变换流(Transform streams) 是一种 Duplex 流。它的输出与输入是通过某种方式关联的。和所有 Duplex 流一样,变换流同时实现了 Readable 和 Writable 接口

转换流的输出是从输入中计算出来的
对于转换流,我们不必实现read或write的方法,我们只需要实现一个transform方法,将两者结合起来。它有write方法的意思,我们也可以用它来push数据

变换流的实例包括:

zlib streams

crypto streams

下面简单实现转换流:
const {Transform} = require("stream");
const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
process.stdin.pipe(upperCase).pipe(process.stdout);
对象流
默认情况下,流处理的数据是Buffer/String类型的值。有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象
const {Transform} = require("stream");
let fs = require("fs");
let rs = fs.createReadStream("./users.json");
rs.setEncoding("utf8");
let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});
let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/107224.html

相关文章

  • Node.js 中流操作实践

    摘要:事件的触发频次同样是由实现者决定,譬如在进行文件读取时,可能每行都会触发一次而在请求处理时,可能数的数据才会触发一次。如果有参数传入,它会让可读流停止流向某个特定的目的地,否则,它会移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文节选自 Node.js Chea...

    chaos_G 评论0 收藏0
  • Node.js中流使用

    摘要:流是基于事件的用于管理和处理数据而且有不错的效率借助事件和非阻塞库流模块允许在其可用的时候动态处理在其不需要的时候释放掉使用流的好处举一个读取文件的例子使用同步读取一个文件程序会被阻塞所有的数据都会被读取到内存中换用读取文件程序不会被阻塞但 流是基于事件的API,用于管理和处理数据,而且有不错的效率.借助事件和非阻塞I/O库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉. ...

    h9911 评论0 收藏0
  • 重读 Gulp

    摘要:当接收一个回调函数的时候,一定要注意回调函数中的参数。主要作用就是用来读取文件或者文件夹中的数据。表示文件的名称指的是发生的变化使用技巧的进一步使用,可以参照中文官网中的技巧集。 Gulp 简介 Gulp 对现在的前端而言,是一个稍微老旧的工具了,但是,为了复习以前学过的内容,还是把它翻出来,放在自己的博客中。说不定哪天又用到了呢。 需要说明的是,这里使用的 Gulp 版本是 3.9....

    vpants 评论0 收藏0
  • Node事件机制小记

    摘要:事件的监听与事件的触发事件一事件机制的实现中大部分的模块,都继承自模块。从另一个角度来看,事件侦听器模式也是一种事件钩子的机制,利用事件钩子导出内部数据或状态给外部调用者。的核心就是事件发射与事件监听器功能的封装。 nodejs事件的监听与事件的触发 nodejs事件(Events)showImg(https://segmentfault.com/img/bV0Sqi?w=692&h=...

    airborne007 评论0 收藏0
  • 通过源码解析 Node.js 中导流(pipe)实现

    摘要:回调函数中检测该次写入是否被缓冲,若是,触发事件。若目标可写流表示该写入操作需要进行缓冲,则立刻将源可读流切换至暂停模式。监听源可读流的事件,相应地结束目标可写流。 在Node.js中,流(Stream)是其众多原生对象的基类,它对处理潜在的大文件提供了支持,也抽象了一些场景下的数据处理和传递。在它对外暴露的接口中,最为神奇的,莫过于导流(pipe)方法了。鉴于近期自己正在阅读Node...

    defcon 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<