当前位置:职场发展 > Nodejs Stream 数据流使用手册 [node.js]

Nodejs Stream 数据流使用手册 [node.js]

  • 发布:2023-10-06 14:01

Web 前端|js 教程
no dejs_stream
Web 前端 js 教程
1.介绍
酒店订餐系统源码、Ubuntu启用端口监控、爬虫喂食盒拆箱、ddos php、如何认识seolzw
本文介绍了使用node.js流开发程序的基本方法。
美食小程序源码,如何在ubuntu中传输视频,配置环境变量tomcat,爬虫抓取报告,培训机构php技术外包,seo富阳lzw
》我们应该有一些像花园水管这样的程序连接方式——当需要以另一种方式处理数据时,拧入另一个段。这也是 IO 的方式。”Doug McIlroy。 1964年10月11日
最早接触Stream是从早期的Unix开始。几十年的实践证明,Stream的思想可以非常简单地开发一些庞大的系统。在unix中,Stream是通过|来实现的;在node中,作为内置的stream模块,使用了很多核心模块和第三方模块。和Unix一样,Node Stream的主要操作也是.pipe()。用户可以利用抗压机制来控制读写的平衡。
淘宝店APP源码,如何用VScode删除文件夹,ubuntu绑定网卡,关闭tomcat端口命令,网络爬虫体验,php查杀软件,山东济南网络营销seo推广lzw
流可以提供给开发者可以复用统一的接口,通过抽象的Stream接口来控制Stream之间的读写平衡。

2。为什么使用 Stream

node中的I/O是异步的,所以读写磁盘和网络需要回调函数来读取数据。以下是文件下载服务器的简单代码:

var http = require('http');var fs = require('fs');var server = http.createServer(function (req, res) {fs.readFile(__dirname + '/data.txt', function (err, data) {res.end(data);});});server.listen(8000);
这些代码可以实现所需的功能,但是服务在发送之前需要缓存整个文件数据如果“data.txt”文件很大,并发量很大,就会浪费大量内存。由于用户需要等到整个文件缓存到内存中之后才接受文件数据,这导致用户体验相当差。但幸运的是,两个参数(req, res)都是Stream,所以我们可以使用fs.createReadStream()来代替fs.readFile():

var http = require('http');var fs = require('fs');var server = http.createServer(function (req, res) {var stream = fs.createReadStream(__dirname + '/data. txt');stream.pipe(res);});server.listen(8000);
.pipe()方法监听fs.createReadStream()的'data'和'end'事件,因此“data .txt "该文件不需要缓存整个文件,客户端连接完成后可以立即发送一个数据块给客户端。使用.pipe()的另一个好处是可以解决客户端延迟很大时导致的读写不平衡问题。如果您想在发送之前压缩文件,可以使用第三方模块:

var http = require('http');var fs = require('fs');var oppressor = require('oppressor');var server = http.createServer(function (req, res) {var stream = fs.createReadStream(__dirname + '/data.txt');stream.pipe(oppressor(req)).pipe(res);});server.listen(8000);
这样文件将支持 gzip 和deflate 浏览器压缩。 oppressor 模块处理所有内容编码。

Stream 使程序开发变得简单。

3。基本概念

有五种基本流:可读、可写、转换、双工和“经典”。

3-1、管道

所有类型的 Stream 集合都使用 .pipe() 创建输入输出对,接收可读流 src 并将其数据输出到可写流 dst,如下:

src.pipe(dst)
.pipe(dst)方法返回dst流,这样可以连续使用多个.pipe(),如下:

a.pipe( b ).pipe( c ).pipe( d )
功能与以下代码相同:

a.pipe( b );b.pipe( c );c.pipe( d );
3-2、可读流

通过调用 Readable 流的 .pipe() 方法,可以将 Readable 流的数据写入 Writable、Transform 或 Duplex 流。

可读Stream.pipe( dst )
1>创建可读流

这里我们创建一个可读的流!

var Readable = require('stream').Readable;var rs = new Readable;rs.push('beep');rs.push('boop\n');rs.push(null);rs. pipeline(process.stdout);$ node read0.jsbeep boop
rs.push( null ) 通知数据接收方数据已发送。

请注意,我们没有调用 rs.pipe(process.stdout);在将所有数据内容推入可读流之前,但是我们推入的所有数据内容仍然完全输出。这是因为可读流中所有推送的数据都会被缓存,直到接收者读取到数据。但很多情况下,最好只在接收到数据时才将数据推送到可读流,而不是缓存整个数据。让我们重写 ._read() 函数:

var Readable = require('stream').Readable;var rs = Readable();var c = 97;rs._read = function () {rs.push(String.fromCharCode(c++));if (c > 'z'.charCodeAt(0)) rs.push(null);};rs.pipe(process.stdout);$node read1.jsabcdefghijklmnopqrstuvwxyz
上面的代码重写了_read()方法就是实现只有当数据接收方请求数据时才将数据推送到可读流中。 _read()方法还可以接收一个size参数,表示数据请求所请求的数据大小,但是可读流可以根据需要忽略该参数。

注意,我们还可以使用 util.inherits() 继承可读流。为了说明_read()方法只有在数据接收方请求数据时才会被调用,我们在将数据推入可读流时做了一个延迟,如下:

var Readable = require('stream').Readable;var rs = Readable();var c = 97 - 1;rs._read = function () {if (c >= 'z'.charCodeAt(0) ) return rs.push(null);setTimeout(function () {rs.push(String.fromCharCode(++c));}, 100);};rs.pipe(process.stdout);process.on(' exit', function () {console.error('\n_read() 调用 ' + (c - 97) + ' times');});process.stdout.on('error', process.exit);
使用以下命令运行程序,我们发现_read()方法只被调用了5次:

$ 节点 read2.js | head -c5abcde_read() 调用了5次
使用定时器的原因是系统需要时间发送信号来通知程序关闭管道。 process.stdout.on(‘error’, fn) 用于处理系统发送 SIGPIPE 信号,因为 header 命令关闭了管道,因为这会导致 process.stdout 触发 EPIPE 事件。如果你想创建一个可以推送任意形式数据的可读流,只需在创建流时将参数objectMode设置为true即可,例如:Readable({ objectMode: true })。

2>读取可读流数据

大多数情况下我们只是使用管道方法将数据从可读流重定向到另一种形式的流,但在某些情况下直接从可读流读取数据可能更有用。如下:

process.stdin.on('可读', function () {var buf = www.sychzs.cn();console.dir(buf);});$ (echo abc; sleep 1; echo def; sleep 1; 回显 ghi) | node Consumer0.js null
当可读流中有数据需要读取时,流会触发 'readable' 事件,从而可以调用 .read() 方法读取相关数据。当可读流中没有数据可供读取时,.read() 将返回 null,这样就可以结束对 .read() 的调用,等待下一个 'readable' 事件被触发。以下是使用 .read(n) 从标准输入一次读取 3 个字节的示例:

process.stdin.on('readable', function () {var buf = www.sychzs.cn(3);console.dir(buf);});
运行程序如下,发现输出结果不完全是!

$(回声 abc;睡眠 1;回声 def;睡眠 1;回声 ghi)| node Consumer1.js 
这是由于流的内部缓冲区中残留了额外的数据数据,我们需要通知Stream我们想要读取更多的数据,read(0)可以达到这个目的。

process.stdin.on('可读', function () {var buf = www.sychzs.cn(3);console.dir(buf);www.sychzs.cn(0);});
此操作结果如下:

$(回声 abc;睡眠 1;回声 def;睡眠 1;回声 ghi)| node Consumer2.js 
我们可以使用.unshift()将数据推回到流数据队列的头部,这样读取就可以继续检索返回的数据。例如,以下代码将逐行输出标准输入的内容:

var offset = 0;process.stdin.on('可读', function () {var buf = www.sychzs.cn();if (!buf) return;for (; offset < buf.length; offset++ ) {if (buf[offset] === 0x0a) {console.dir(buf.slice(0, offset).toString());buf = buf.slice(offset + 1);offset = 0;process.stdin .unshift(buf);return;}}process.stdin.unshift(buf);});$ tail -n +50000 /usr/share/dict/american-english |头-n10 | nodelines.js 'hearties' 'heartiest''heartily''heartiness''heartiness\'s''heartland''heartland\'s''heartlands''heartless''heartless'
当然还有很多模块可以实现这个功能的如:split。

3-3、可写流

可写流只能用作.pipe()函数的目标参数。代码如下:

src.pipe( writableStream );
1>创建可写流

重写 ._write(chunk, enc, next) 方法以接受来自可读流的数据。

var Writable = require('stream').Writable;var ws = Writable();ws._write = function (chunk, enc, next) {console.dir(chunk);next();};process. stdin.pipe(ws);$(回声蜂鸣声;睡眠 1;回声 boop)| node write0.js 
第一个参数chunk是数据输入器写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入器可以写入更多时间。如果可读流写入字符串,则该字符串默认会转换为Buffer。如果创建流时设置了 Writable({decodeStrings: false }) 参数,则不会执行任何转换。如果可读流写入的数据是一个对象,那么就需要这样创建可写流

Writable({ objectMode: true })
2>将数据写入可写流

调用可写流的.write(data)方法完成数据写入。

process.stdout.write('beep boop\n');
调用.end()方法通知可写流数据已写入。

var fs = require('fs');var ws = fs.createWriteStream('message.txt');ws.write('beep');setTimeout(function () {ws.end('boop\n) ');}, 1000);$nodewriting1.js$catmessage.txtbeepboop
如果需要设置可写流的缓冲区大小,那么在创建流的时候需要设置opts.highWaterMark,所以如果 buffer 中的数据超过 opts.highWaterMark,.write(data) 方法将返回 false。当缓冲区可写时,可写流将触发“drain”事件。

3-4、经典直播

经典流是一个比较古老的接口,首先出现在node 0.4中,但是了解其运行原理还是非常有用的
。当一个流注册了“data”事件返回函数时,该流将工作在旧版本模式下,即它将使用旧的API。

1>经典可读流

经典的可读流事件是一个事件触发器。如果经典可读流有数据要读取,则会触发“data”事件。当数据被读取时,会触发“end”事件。 .pipe()方法通过检查www.sychzs.cn的值来确定流是否有数据要读取。以下是使用经典可读流打印字母 A-J 的示例:

var Stream = require('stream');var stream = new Stream;www.sychzs.cn = true;var c = 64;var iv = setInterval(function () {if (++c >= 75) {clearInterval (iv);stream.emit('end');}else stream.emit('data', String.fromCharCode(c));}, 100);stream.pipe(process.stdout);$ node classic0.jsABCDEFGHIJ 
如果你想从经典可读流中读取数据,只需注册“data”和“end”事件的回调函数即可。代码如下:

process.stdin.on('data', function (buf) {console.log(buf);});process.stdin.on('end', function () {console.log('__END__') ;});$(回声蜂鸣声;睡眠 1;回声 boop)| node classic1.js __END__
需要注意的是,如果使用这种方法读取数据,你将失去使用新接口的好处。例如,当你向一个延迟非常大的流写入数据时,你需要注意读数据和写数据之间的平衡。否则,内存中会缓存大量数据,造成内存大量浪费。一般此时强烈建议使用流的.pipe()方法,这样就不用自己去监听“data”和“end”事件,也不用担心阅读和写作的不平衡。当然,你也可以自己使用through来监听“数据”和“结束”事件,比如下面的代码:

var through = require('through');process.stdin.pipe(through(write, end));function write (buf) {console.log(buf);}function end () {console.log( '__END__');}$ (echo beep; sleep 1; echo boop) | node through.js __END__
或者可以使用 concat-stream 来缓存整个流的内容:

var concat = require('concat-stream');process.stdin.pipe(concat(function (body) {console.log(JSON.parse(body));}));$ echo '{"beep ":"噗"}' | node concat.js { beep: 'boop' }
当然,如果你必须自己监听“data”和“end”事件,那么你可以在流不可写的情况下写入数据。使用 .pause() 方法暂停 Classic 可读流并继续触发“data”事件。等待写入数据的流可写,然后使用 .resume() 方法通知流继续触发“data”事件继续读取
数据。

2>经典可写流

经典的可写流非常简单。只有三个方法:.write(buf)、.end(buf) 和 .destroy()。 .end(buf) 方法的 buf 参数是可选的。如果选择该参数,则相当于stream.write(buf);等操作流.end()。需要注意的是,当流缓冲区已满时,流不可写时的 write(buf) 方法将返回 false。如果流再次可写,则流将触发耗尽事件。

4、变形

transform是一个流,对读取的数据进行过滤,然后输出。

5、复式

www.sychzs.cn
网站部分内容来自网友。如有侵权,请联系我们删除,970928#www.sychzs.cn

相关文章

热门推荐