Nodejs 包含一个内置模块 stream
,它允许我们处理流数据。在这篇文章中,我们将通过几个简单的例子来解释stream
的用法。我们还将描述在面对复杂情况构建高性能应用程序时如何构建管道来合并不同的流。 。
在深入了解应用程序构建之前,了解 Node.js stream
模块提供的功能非常重要。
让我们开始吧!
Node.js stream
提供四种类型的流
可读流(Readable Streams) 可写流(Duplex Streams) 转换流(Transform Streams)
让我们从高层次上看一下每种流类型。
可读流可以从特定数据源(最常见的是从文件系统)读取数据。 Node.js 应用程序中可读流的其他常见用途包括:
process.stdin
- 通过stdin
读取终端应用程序中的用户输入。 http.IncomingMessage
– 读取 HTTP 服务器中传入请求的内容或在 HTTP 客户端中读取服务器的 HTTP 响应。
您可以使用可写流将数据从应用程序写入特定位置,例如文件。
process.stdout
可用于将数据写入标准输出,并由 console.log
在内部使用。
接下来是双工流和变换流,它们可以定义为基于可读流和可写流的混合流类型。
双工流是可读流和可写流的组合。它可以将数据写入特定的位置并从数据源读取数据。双工流最常见的示例是 net.Socket
,它用于从套接字读取和写入数据。
需要注意的是,双工流的可读端和可写端的操作是相互独立的,数据不会从一端流向另一端。
转换流与双工流略有相似,但在转换流中,可读端和可写端是相关联的。
crypto.Cipher
类就是一个很好的例子,它实现了加密流。通过 crypto.Cipher
流,应用程序可以将纯文本数据写入流的可写端,并从流的可读端读取加密的密文。由于其转换属性,这种类型的流被称为转换流。
旁注:另一个转换流是stream.PassThrough
。stream.PassThrough
将数据从可写端传递到可读端,无需任何转换。这可能听起来多余,但直通流对于构建自定义流以及流管道非常有帮助。 (例如创建流数据的多个副本)
一旦可读流连接到生产数据源(例如文件),就可以通过多种方式从流中读取数据。
首先,创建一个名为myfile
的简单文本文件,大小为85字节,包含以下字符串:
现在,让我们看看从可读流中读取数据的两种不同方式。1。侦听
data
事件从可读流中读取数据的最常见方法是侦听流发出的
data
事件。以下代码演示了这种方法:const fs = require('fs')const 可读 = fs.createReadStream('./myfile', { highWaterMark: 20 });read.on('data', (chunk) => { console.log( `读取 ${chunk.length} 个字节\n"${chunk.toString()}"\n`);})
highWaterMark
属性作为选项传递给fs.createReadStream
,用于确定此流中缓冲了多少数据。然后数据被刷新到读取机制(在本例中为我们的data
处理程序)。默认情况下,fs
流的highWaterMark
值为 64kb。我们特意将该值重写为 20 字节,以触发多个data
事件。如果运行上述程序,它将分五次迭代从
myfile
读取 85 个字节。您将在控制台上看到以下输出:读取 20 字节“Lorem ipsum dolor si”读取 20 字节“t amet, consectetur”读取 20 字节“adipiscing elit. Cur”读取 20 字节“abitur nec mauris tu”读取 5 字节“rpi。”2 。使用异步迭代器
从可读流中读取数据的另一种方法是使用异步迭代器:
const fs = require('fs')const 可读 = fs.createReadStream('./myfile', { highWaterMark: 20 });(async () => { for wait (const chunk of Read) { console.log. log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); }})()如果运行此程序,您将得到与之前相同的结果示例输出。
可读的 Node.js 流状态
当侦听器侦听可读流的
检查流的“流动”状态data
事件时,流的状态切换到“流动”状态(除非流显式暂停)。您可以通过流对象的readFlowing
属性我们可以稍微修改一下前面的示例,并通过
数据
处理器进行演示:const fs = require('fs')const read = fs.createReadStream('./myfile', { highWaterMark: 20 });let bytesRead = 0console.log(`在附加 'data' 处理程序之前。正在流动: ${read.readFlowing}`);read.on('data', (chunk) => { console.log(`读取 ${chunk.length} 个字节`); bytesRead += chunk.length // 从 Stop 开始可读读完读流中的 60 个字节后进行读取 if (bytesRead === 60) { Readable.pause() console.log(`afterpause() call.isflowing: ${read.readFlowing}`); // 继续阅读等待 1 秒后 setTimeout(() => { read.resume() console.log(`resume() 调用后正在流动:${read.readFlowing}`); }, 1000) }} )console.log (`附加'data'处理程序后。正在流动:${read.readFlowing}`);在这个例子中,我们从可读流中读取
myfile
,但是在读取60个字节后,我们暂时暂停数据流 1 秒。我们还在不同时间打印了readFlowing
属性的值,以了解它是如何变化的。如果运行上述程序,您将得到以下输出:
在附加“数据”处理程序之前。正在流动:附加“数据”处理程序后为空。正在流动: trueRead 20 bytesRead 20 bytesRead 20 bytesafterpause() call。正在流动:resume() 调用后为 false。正在流动: trueRead 20 bytesRead 5 bytes我们可以用以下方式解释输出:
当我们的程序启动时,
连接到readableFlowing
的值为null
,因为我们没有提供任何消费流的机制。data
处理器后,ReadableFlowing 变为
true
。读取 60 个字节后,通过调用
pause()
暂停流,并且readableFlowing
也转换为false
。等待 1 秒后,通过调用
resume()
,流程再次切换到“流动”模式,并且readFlowing
更改为“true”。然后文件内容的其余部分流入流中。使用 Node.js 流处理大量数据
由于流,应用程序不需要在内存中保留大型二进制对象:可以在接收到较小的数据块时对其进行处理。
在这一部分中,让我们组合不同的流来构建一个可以处理大量数据的实际应用程序。我们将使用一个小型实用程序来生成给定文件的 SHA-256。
但首先,我们需要创建一个 4GB 的大假文件来测试。您可以使用简单的 shell 命令来完成此操作:
在 macOS 上:
mkfile -n 4g 4gb_file
在 Linux 上:xfs_mkfile 4096m 4gb_file
创建假文件
4gb_file
后,让我们在不使用stream
模块的情况下生成该文件的 SHA-256 哈希值。const fs = require("fs");const crypto = require("crypto");fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log( readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console. error(err) });});如果运行上面的代码,可能会出现以下错误:
RangeError [ERR_FS_FILE_TOO_LARGE]:FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE'}文件大小 (4294967296) 大于 2 GB上面发生错误这是因为 JavaScript 运行时无法处理随机大缓冲区。运行时可以处理的缓冲区的最大大小取决于您的操作系统体系结构。您可以使用内置
buffer
模块中的buffer.constants.MAX_LENGTH
变量来检查操作系统缓存的最大大小。即使不发生上述错误,将大文件保留在内存中也是有问题的。我们可用的物理内存量限制了我们的应用程序可以使用的内存量。高内存使用率还可能导致应用程序在 CPU 使用率方面表现不佳,因为垃圾收集可能会变得昂贵。
使用
pipeline()
减少APP的内存占用现在,让我们看看如何修改应用程序以使用流并避免遇到此错误:
const fs = require("fs");const crypto = require("crypto");const { pipeline } = require("stream");const hashStream = crypto.createHash("sha256");hashStream.setEncoding ('base64')const inputStream = fs.createReadStream("./4gb_file");const outputStream = fs.createWriteStream("./checksum.txt");pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) })在此示例中,我们使用
crypto.createHash
函数提供的流式传输方法。它返回一个“转换”流对象hashStream
,为随机大文件生成哈希。要将文件内容传输到此转换流中,我们使用
fs.createReadStream
为4gb_file
创建一个可读流inputStream。我们将
hashStream
转换流的输出传递到可写流outputStream
,并通过fs.create 通过 Write 创建
。checksum.txt
直播如果运行上述程序,您将在
checksum.txt
文件中看到 4GB 文件的 SHA-256 哈希值。使用
pipeline()
与pipe()的比较
在之前的案例中,我们使用
pipeline
函数来连接多个流。另一种常见的方法是使用.pipe()
函数,如下所示:inputStream .pipe(hashStream) .pipe(outputStream)但是有几个原因,所以不建议在生产应用中使用
.pipe()
。如果其中一个流关闭或发生错误,pipe()
不会自动销毁连接的流,这可能会导致应用程序内存泄漏。同样,pipe()
不会自动将流中的错误转发到一处进行处理。因为这些问题,就有了
pipeline()
,所以建议大家使用pipeline()
代替pipe()
来连接不同的溪流。我们可以重写上面的pipe()
示例来使用pipeline()
函数,如下所示:pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) })
pipeline()
接受回调函数作为最后一个参数。连接流中的任何错误都会触发此回调函数,因此可以轻松地在一处处理错误。摘要:使用 Node.js 流减少内存并提高性能
在 Node.js 中使用流可以帮助我们构建可以处理大数据的高性能应用程序。
在本文中,我们介绍:
四种类型的 Node.js 流(可读、可写、双工和转换)。如何通过侦听
data
事件或使用异步迭代器从可读流中读取数据。通过使用管道
连接多个流来减少内存占用。简短警告:您很可能不会遇到很多必须使用流的场景,并且基于流的解决方案会增加应用程序的复杂性。始终确保使用流的好处大于复杂性。
nodejs教学!