博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Node.js 流的使用及实现
阅读量:5823 次
发布时间:2019-06-18

本文共 9276 字,大约阅读时间需要 30 分钟。

Node.js 中有四种基本的流类型:

  1. Readable - 可读的流 (例如 fs.createReadStream()).
  2. Writable - 可写的流 (例如 fs.createWriteStream()).
  3. Duplex - 可读写的流 (例如 net.Socket).
  4. Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

流的特点:

  1. 流的工作模式是典型的 生产消费者模式
  2. Readable和Writable都有缓冲区,在读写操作时都是先将数据放入缓冲区,缓冲区的大小由构造参数 highWaterMark 决定。
  3. 缓冲区满了以后,就形成了「背压」,返回 false 给消费者,待缓冲区被消费小于 highWaterMark 后,返回 true
  4. 流一般只对strings 和 Buffer进行操作。
  5. Duplex 和 Transform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓冲器用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。
  6. 所有流都是 EventEmitter 的子类。

Readable Stream

可读流事实上工作在下面两种模式之一:flowing 和 paused 。

模式简述

  1. 在flowing状态下,会不停的发送data事件,给消费者使用数据。
  2. 在paused状态下,只能通过三种方法重新回来flowing状态。
    • 监听 data 事件。
    • 调用 resume() 方法。
    • 调用 pipe() 方法将数据发送到 Writable。
  3. 可以调用下面二种方法进用paused状态。
    • 不存在管道目标,调用 pause() 方法。
    • 存在管道目标,取消 'data' 事件监听,并调用 unpipe() 方法。

状态简述

若 readable._readableState.flowing 为 null,由于不存在数据消费者,可读流将不会产生数据。

readable._readableState.flowing = null

调用 readable.pause() 方法, readable.unpipe() 方法,或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。

readable._readableState.flowing = false

在null状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法,readable._readableState.flowing 的值将会变为 true 。

readable._readableState.flowing = true

一般使用代码:

let fs = require('fs');let path = require('path');let ReadStream = require('./ReadStream')// 返回的是一个可读流对象let rs = new ReadStream(path.join(__dirname, '1.txt'), {    flags: 'r', // 文件的操作是读取操作    encoding: 'utf8', // 默认是null null代表的是buffer    autoClose: true, // 读取完毕后自动关闭    highWaterMark: 3, // 默认是64k  64*1024b    start: 0,     //end:3 // 包前又包后});rs.on('open', function () {    console.log('文件打开了')});rs.on('end', function () {    console.log('读取结束了')});rs.on('close', function () {    console.log('文件关闭')});rs.on('error', function (err) {    console.log(err);});// flowing模式会一直触发data事件,直到读取完毕// rs.setEncoding('utf8');rs.on('data', function (data) { // 暂停模式 -> 流动模式    console.log(data);    // rs.pause(); // 暂停方法 切换至paused 模式});// 输出----// 文件打开了// 文件内容// 读取结束了// 文件关闭复制代码

模拟实现原码

let EventEmitter = require('events');let fs = require('fs');class ReadStream extends EventEmitter {    constructor(path, options) {        super();        this.path = path;        this.flags = options.flags || 'r';        this.autoClose = options.autoClose || true;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.start = options.start || 0;        this.end = options.end;        this.encoding = options.encoding || null        this.flowing = null; // null就是暂停模式        // 看是否监听了data事件,如果监听了 就要变成流动模式        // 要建立一个buffer 这个buffer就是要一次读多少        this.buffer = Buffer.alloc(this.highWaterMark);        this.pos = this.start; // pos 读取的位置 可变 start不变的        this.on('newListener', (eventName, callback) => {            if (eventName === 'data') {                // 相当于用户监听了data事件                this.flowing = true;                // 监听了 就去读                this.read(); // 去读内容了            }        });                this.open(); //打开文件 fd    }    read() {        // 此时文件还没打开呢        if (typeof this.fd !== 'number') {            // 当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd肯定有了            return this.once('open', () => this.read())        }        // 此时有fd了        // 应该填highWaterMark?        // 想读4个 写的是3  每次读3个        // 123 4        let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark;        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytesRead) => {            // 读到了多少个 累加            if (bytesRead > 0) {                this.pos += bytesRead;                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);                this.emit('data', data);                // 当读取的位置 大于了末尾 就是读取完毕了                if (this.pos > this.end) {                    this.emit('end');                    this.destroy();                }                if (this.flowing) { // 流动模式继续触发                    this.read();                }            } else {                this.emit('end');                this.destroy();            }        });    }    resume() {        this.flowing = true;        this.read();    }    pause() {        this.flowing = false;    }    destroy() {        // 先判断有没有fd 有关闭文件 触发close事件        if (typeof this.fd === 'number') {            fs.close(this.fd, () => {                this.emit('close');            });            return;        }        this.emit('close'); // 销毁    };    open() {        // copy 先打开文件        fs.open(this.path, this.flags, (err, fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) { // 是否自动关闭                    this.destroy();                }                return;            }            this.fd = fd; // 保存文件描述符            this.emit('open'); // 文件打开了        });    }}module.exports = ReadStream;复制代码

Writable Stream

写入流的原理和读取流一样,这里简单叙述一下:

  1. 调用write()传入写入内容。
  2. 当达到highWaterMark时,再次调用会返回false。
  3. 当写入完成,并清空buffer时,会出发“drain"事件,这时可以再次写入。

一般使用方法:

let fs = require('fs');let path = require('path');let WriteStream = require('./WriteStream')let ws = new WriteStream(path.join(__dirname, '1.txt'), {    highWaterMark: 3,    autoClose: true,    flags: 'w',    encoding: 'utf8',    mode: 0o666,    start: 0,});let i = 9;function write() {    let flag = true;    while (i > 0 && flag) {        flag = ws.write(--i + '', 'utf8', () => {            console.log('ok')        });        console.log(flag)    }}write();// drain只有当缓存区充满后 并且被消费后触发ws.on('drain', function () {    console.log('抽干')    write();});复制代码

模拟实现原码

let EventEmitter = require('events');let fs = require('fs');class WriteStream extends EventEmitter {    constructor(path, options) {        super();        this.path = path;        this.highWaterMark = options.highWaterMark || 16 * 1024;        this.autoClose = options.autoClose || true;        this.mode = options.mode;        this.start = options.start || 0;        this.flags = options.flags || 'w';        this.encoding = options.encoding || 'utf8';        // 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中        // 在源码中是一个链表 => []        this.buffers = [];        // 标识 是否正在写入        this.writing = false;        // 是否满足触发drain事件        this.needDrain = false;        // 记录写入的位置        this.pos = 0;        // 记录缓存区的大小        this.length = 0;        this.open();    }    destroy() {        if (typeof this.fd !== 'number') {            return this.emit('close');        }        fs.close(this.fd, () => {            this.emit('close')        })    }    open() {        fs.open(this.path, this.flags, this.mode, (err, fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) {                    this.destroy();                }                return            }            this.fd = fd;            this.emit('open');        })    }    write(chunk, encoding = this.encoding, callback = () => {}) {        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);        // write 返回一个boolean类型         this.length += chunk.length;        let ret = this.length < this.highWaterMark; // 比较是否达到了缓存区的大小        this.needDrain = !ret; // 是否需要触发needDrain        // 判断是否正在写入 如果是正在写入 就写入到缓存区中        if (this.writing) {            this.buffers.push({                encoding,                chunk,                callback            }); // []        } else {            // 专门用来将内容 写入到文件内            this.writing = true;            this._write(chunk, encoding, () => {                callback();                this.clearBuffer();            }); // 8        }        return ret;    }    clearBuffer() {        let buffer = this.buffers.shift();        if (buffer) {            this._write(buffer.chunk, buffer.encoding, () => {                buffer.callback();                this.clearBuffer()            });        } else {            this.writing = false;            if (this.needDrain) { // 是否需要触发drain 需要就发射drain事件                this.needDrain = false;                this.emit('drain');            }        }    }    _write(chunk, encoding, callback) {        if (typeof this.fd !== 'number') {            return this.once('open', () => this._write(chunk, encoding, callback));        }        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => {            this.length -= byteWritten;            this.pos += byteWritten;            callback(); // 清空缓存区的内容        });    }}module.exports = WriteStream;复制代码

Duplex Stream

双工流,又能读,又能写

一般使用方法:

let {Duplex} = require('stream');let d = Duplex({    read() {        this.push('hello');        this.push(null)    },    write(chunk, encoding, callback) {        console.log("write:" + chunk.toString());        callback();    }});d.on('data', function (data) {    console.log("read:" + data.toString());});d.write('hello');复制代码

Transform

一般使用方法:

let {Transform} =  require('stream');// 他的参数和可写流一样let tranform1 = Transform({    transform(chunk,encoding,callback){        let a = chunk.toString().toUpperCase();        this.push(a); // 将输入的内容放入到可读流中        //console.log(a)        callback();    }});let tranform2 = Transform({    transform(chunk,encoding,callback){        console.log(chunk.toString());        callback();    }});// 希望将输入的内容转化成大写在输出出来process.stdin.pipe(tranform1).pipe(tranform2);复制代码

转载地址:http://gmbdx.baihongyu.com/

你可能感兴趣的文章
urb传输的代码分析【转】
查看>>
理解 QEMU/KVM 和 Ceph(3):存储卷挂接和设备名称
查看>>
一道算法题的一种O(n)解法
查看>>
ABP理论学习之NHibernate集成
查看>>
反射之动态创建对象
查看>>
隐马尔可夫模型学习小记——forward算法+viterbi算法+forward-backward算法(Baum-welch算法)...
查看>>
[MFC] CList
查看>>
[Android Pro] 完美Android Cursor使用例子(Android数据库操作)
查看>>
4 张 GIF 图帮助你理解二叉查找树
查看>>
c++中sizeof的分析
查看>>
线程间操作无效: 从不是创建控件的线程访问它的解决方法
查看>>
hdu 1236 排名
查看>>
【爆牙游记】黄山归来不看岳-日出。
查看>>
PHP面向对象深入研究之【继承】,减少代码重复
查看>>
RBAC权限管理
查看>>
此博客不再发表对自己私事的看法
查看>>
后台(20)——数据库连接池
查看>>
C# 开机自动启动程序
查看>>
导致Asp.Net站点重启的10个原因
查看>>
v7000数据恢复_MDisk重建数据恢复方法(北亚数据恢复)
查看>>