资讯专栏INFORMATION COLUMN

node那点事(二) -- Writable streams(可写流)、自定义流

mtunique / 556人阅读

摘要:可写流可写流是对数据写入目的地的一种抽象。对象流的特点就是它有一个标志,我们可以设置它让流可以接受任何对象。

可写流(Writable Stream)

可写流是对数据写入"目的地"的一种抽象。

可写流的原理其实与可读流类似,当数据过来的时候会写入缓存池,当写入的速度很慢或者写入暂停时候,数据流便会进入到队列池缓存起来,当然即使缓存池满了,剩余的数据也是存在内存

可写流的简单用法如下代码

let fs = require("fs");
let path = require("path");
let ws = fs.createWriteStream(path.join(__dirname,"1.txt"),{
    highWaterMark:3,
    autoClose:true,
    flags:"w",
    encoding:"utf8",
    mode:0o666,
    start:0,
}); 
let i = 9;
function write(){
    let flag = true;
    while(i>0&&flag){
        flag = ws.write(--i+"","utf8",()=>{console.log("ok")});
        console.log(flag)
    }
}
write();
// drain只有当缓存区充满后 并且被消费后触发
ws.on("drain",function(){
    console.log("抽干")
    write();
});
实现原理

现在就让我们来实现一个简单的可写流,来研究可写流的内部原理,可写流有很多方法与可读流类似,这里不在重复了首先要有一个构造函数来定义一些基本选项属性,然后调用一个open放法打开文件,并且有一个destroy方法来处理关闭逻辑

let EventEmitter = require("events");
let fs = require("fs");

class WriteStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.autoClose = options.autoClose || true;
        this.mode = options.mode;
        this.start = options.start || 0;
        this.flags = options.flags || "w";
        this.encoding = options.encoding || "utf8";

        // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中
        // 在源码中是一个链表 => []
        this.buffers = [];

        // 标识 是否正在写入
        this.writing = false;

        // 是否满足触发drain事件
        this.needDrain = false;

        // 记录写入的位置
        this.pos = 0;

        // 记录缓存区的大小
        this.length = 0;
        this.open();
    }
    
    destroy() {
        if (typeof this.fd !== "number") {
            return this.emit("close");
        }
        fs.close(this.fd, () => {
            this.emit("close")
        });
    }
    
    open() {
        fs.open(this.path, this.flags, this.mode, (err,fd) => {
            if (err) {
                this.emit("error", err);
                if (this.autoClose) {
                    this.destroy();
                }
                return;
            }
            this.fd = fd;
            this.emit("open");
        })
    }
}

module.exports = WriteStream;

接着我们实现write方法来让可写流对象调用,在write方法中我们首先将数据转化为buffer,接着实现一些事件的触发条件的逻辑,如果现在没有正在写入的话我们就要真正的进行写入操作了,这里我们实现一个_write方法来实现写入操作,否则则代表文件正在写入,那我们就将流传来的数据先放在缓存区中,保证写入数据不会同时进行。

write(chunk,encoding=this.encoding,callback=()=>{}){
    chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
    // write 返回一个boolean类型 
    this.length+=chunk.length; 
    let ret = this.length{
            callback();
            this.clearBuffer();
        }); // 8
    }
    return ret;
}

_write(chunk,encoding,callback){
    if(typeof this.fd !== "number"){
        return this.once("open",()=>this._write(chunk,encoding,callback));
    }
    fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
        this.length -= byteWritten;
        this.pos += byteWritten;
        
        callback(); // 清空缓存区的内容
    });
}
    

_write写入之后的回调中我们会调用传入回调函数clearBuffer,这个方法会去buffers中继续递归地把数据取出,然后继续调用_write方法去写入,直到全部buffer中的数据取出后,这样就清空了buffers。

clearBuffer(){
    let buffer = this.buffers.shift();
    if(buffer){
        this._write(buffer.chunk,buffer.encoding,()=>{
            buffer.callback();
            this.clearBuffer()
        });
    }else{
        this.writing = false;
        if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件
            this.needDrain = false;
            this.emit("drain");
        }
    }
}

最后附上完整的代码

let EventEmitter = require("events");
let fs = require("fs");
class WriteStream extends EventEmitter{
    constructor(path,options){
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark||16*1024;
        this.autoClose = options.autoClose||true;
        this.mode = options.mode;
        this.start = options.start||0;
        this.flags = options.flags||"w";
        this.encoding = options.encoding || "utf8";

        // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中
        // 在源码中是一个链表 => []

        this.buffers = [];

        // 标识 是否正在写入
        this.writing = false;

        // 是否满足触发drain事件
        this.needDrain = false;

        // 记录写入的位置
        this.pos = 0;

        // 记录缓存区的大小
        this.length = 0;
        this.open();
    }
    destroy(){
        if(typeof this.fd !=="number"){
            return this.emit("close");
        }
        fs.close(this.fd,()=>{
            this.emit("close")
        })
    }
    open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit("error",err);
                if(this.autoClose){
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit("open");
        })
    }
    write(chunk,encoding=this.encoding,callback=()=>{}){
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
        // write 返回一个boolean类型 
        this.length+=chunk.length; 
        let ret = this.length{
                callback();
                this.clearBuffer();
            }); // 8
        }
        return ret;
    }
    clearBuffer(){
        let buffer = this.buffers.shift();
        if(buffer){
            this._write(buffer.chunk,buffer.encoding,()=>{
                buffer.callback();
                this.clearBuffer()
            });
        }else{
            this.writing = false;
            if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件
                this.needDrain = false;
                this.emit("drain");
            }
        }
    }
    _write(chunk,encoding,callback){
        if(typeof this.fd !== "number"){
            return this.once("open",()=>this._write(chunk,encoding,callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
            this.length -= byteWritten;
            this.pos += byteWritten;
            
            callback(); // 清空缓存区的内容
        });
    }
}

module.exports = WriteStream;
Pipe管道流

前面我们了解了可读流与可写流,那么怎么让二者结合起来使用呢,node给我们提供好了方法--Pipe管道,流顾名思义,就是在可读流与可写流中间加入一个管道,实现一边读取,一边写入,读一点写一点。

Pipe的使用方法如下

let fs = require("fs");
let path = require("path");
let ReadStream = require("./ReadStream");
let WriteStream = require("./WriteStream");

let rs = new ReadStream(path.join(__dirname, "./1.txt"), {
    highWaterMark: 4
});
let ws = new WriteStream(path.join(__dirname, "./2.txt"), {
    highWaterMark: 1
});
// 4 1
rs.pipe(ws); 
实现原理

Pipe的原理比较简单,简单说监听可读流的data事件来持续获取文件中的数据,然后我们就会去调用写流的write方法。如果可写流缓存区已满,那么当我们得到调用可读流的pause方法来暂停读取,然后等到写流的缓存区已经全部写入并且触发drain事件时,我们就会调用resume重新开启读取的流程。上代码

pipe(ws) {
    this.on("data", (chunk) => {
        let flag = ws.write(chunk);
        if (!flag) {
            this.pause();
        }
    });
    ws.on("drain", () => {
        this.resume();
    })
}
自定义流

Node允许我们自定义流,读流继承于Readable接口,写流则继承于Writable接口,所以我们其实是可以自定义一个流模块,只要继承stream模块对应的接口即可。

自定义可读流

如果我们要自定义读流的话,那我们就需要继承Readable,Readable里面有一个read()方法,默认调用_read(),所以我们只要复写了_read()方法就可实现读取的逻辑,同时Readable中也提供了一个push方法,调用push方法就会触发data事件,push中的参数就是data事件回调函数的参数,当push传入的参数为null的时候就代表读流停止,上代码

let { Readable } = require("stream");

// 想实现什么流 就继承这个流
// Readable里面有一个read()方法,默认掉_read()
// Readable中提供了一个push方法你调用push方法就会触发data事件
let index = 9;
class MyRead extends Readable {
    _read() {
        // 可读流什么时候停止呢? 当push null的时候停止
        if (index-- > 0) return this.push("123");
        this.push(null);
    }
}

let mr = new MyRead();
mr.on("data", function(data) {
    console.log(data);
});
自定义可写流

与自定义读流类似,自定义写流需要继承Writable接口,并且实现一个_write()方法,这里注意的是_write中可以传入3个参数,chunk, encoding, callback,chunk就是代表写入的数据,通常是一个buffer,encoding是编码类型,通常不会用到,最后的callback要注意,它并不是我们用这个自定义写流调用write时的回调,而是我们上面讲到写流实现时的clearBuffer函数。

let { Writable } = require("stream");

// 可写流实现_write方法
// 源码中默认调用的是Writable中的write方法
class MyWrite extends Writable {
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback(); // clearBuffer
    }
}

let mw = new MyWrite();
mw.write("111", "utf8", () => {
    console.log(1);
})
mw.write("222", "utf8", () => {
    console.log(1);
});
Duplex 双工流

双工流其实就是结合了上面我们说的自定义读流和自定义写流,它既能读也能写,同时可以做到读写之间互不干扰

let { Duplex } =  require("stream");

// 双工流 又能读 又能写,而且读取可以没关系(互不干扰)
let d = Duplex({
    read() {
        this.push("hello");
        this.push(null);
    },
    write(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});

d.on("data", function(data) {
    console.log(data);
});
d.write("hello");
Transform 转换流

转换流的本质就是双工流,唯一不同的是它并不需要像上面提到的双工流一样实现read和write,它只需要实现一个transform方法用于转换

let { Transform } =  require("stream");

// 它的参数和可写流一样
let tranform1 = Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中
        callback();
    }
});
let tranform2 = Transform({
    transform(chunk, encoding, callback){
        console.log(chunk.toString());
        callback();
    }
});

// 等待你的输入
// rs.pipe(ws);
// 希望将输入的内容转化成大写在输出出来
process.stdin.pipe(tranform1).pipe(tranform2);
// 对象流 可读流里只能放buffer或者字符串 对象流里可以放对象
对象流

默认情况下,流处理的数据是Buffer/String类型的值。对象流的特点就是它有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象。上代码

const { Transform } = require("stream");

let fs = require("fs");
let rs = fs.createReadStream("./users.json");

rs.setEncoding("utf8");

let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});

let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/94289.html

相关文章

  • 浅谈node.js中的stream()

    摘要:在可读流事件里我们就必须调用方法。当一个对象就意味着我们想发出信号这个流没有更多数据了自定义可写流为了实现可写流,我们需要使用流模块中的构造函数。我们只需给构造函数传递一些选项并创建一个对象。 前言 什么是流呢?看字面意思,我们可能会想起生活中的水流,电流。但是流不是水也不是电,它只是描述水和电的流动;所以说流是抽象的。在node.js中流是一个抽象接口,它不关心文件内容,只关注是否从...

    elliott_hu 评论0 收藏0
  • 通过源码解析 Node.js 中导(pipe)的实现

    摘要:回调函数中检测该次写入是否被缓冲,若是,触发事件。若目标可写流表示该写入操作需要进行缓冲,则立刻将源可读流切换至暂停模式。监听源可读流的事件,相应地结束目标可写流。 在Node.js中,流(Stream)是其众多原生对象的基类,它对处理潜在的大文件提供了支持,也抽象了一些场景下的数据处理和传递。在它对外暴露的接口中,最为神奇的,莫过于导流(pipe)方法了。鉴于近期自己正在阅读Node...

    defcon 评论0 收藏0
  • [译]关于Node.js streams你需要知道的一切

    摘要:当一个客户端的响应对象是一个可读流,那么在服务器端这就是一个可写流。的模块给我们提供了一个可以操作任何文件的可读流通过方法创建。创建一个可读流创建可读流,我们需要类创建一个可读流非常简单。可以通过修改可读流配置里面的方法实现。 Node.js的stream模块是有名的应用困难,更别说理解了。那现在可以告诉你,这些都不是问题了。 多年来,开发人员在那里创建了大量的软件包,其唯一目的就是使...

    bang590 评论0 收藏0
  • Node.js 中操作实践

    摘要:事件的触发频次同样是由实现者决定,譬如在进行文件读取时,可能每行都会触发一次而在请求处理时,可能数的数据才会触发一次。如果有参数传入,它会让可读流停止流向某个特定的目的地,否则,它会移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文节选自 Node.js Chea...

    chaos_G 评论0 收藏0
  • node点事(一) -- Readable streams(可读

    摘要:流的类型中有四种基本的流类型可读的流例如可写的流例如可读写的流例如在读写过程中可以修改和变换数据的流例如可读流可读流有两种模式流动模式可读流自动读取数据,通过接口的事件尽快将数据提供给应用。 流的简介 流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实...

    rickchen 评论0 收藏0

发表评论

0条评论

mtunique

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<