你使用过 Node.js 的流(streams)吗?你的使用体验如何?
当我第一次尝试使用流时,说实话,我感到非常困惑。这个概念对我来说完全陌生。我本以为可以直接忽略它们,但事实证明流在 Node.js 中无处不在。即使是像 fs 和 http 这样的核心模块在底层也使用了流。所以,我不得不学习并理解它们的工作原理。
对我有帮助的是建立一个由多个概念组成的强大思维模型。在本文中,我们将一起探索这些概念,并建立对 Node.js 流的思维模型。
什么是 Node.js 流?
流的主要思想是它们从一个地方获取数据片段并将其传输到另一个地方。基于这个定义,我想强调 4 个重要部分:
- 流以片段形式传输数据,而不是整体传输
- 流传输特定大小的数据片段
- 流对传输的数据本身不感兴趣
- 流仅仅提供数据传输的机制
描述流的一个常见类比是管道。但是,这个类比经常忽略了两个关键部分:生产者和消费者。让我们使用相同的类比,但使其更完整。
想象一个巨大的水库,附近有一所房子。要向房子供水,你需要从水库铺设一条管道到你的家。
这个类比说明了流的三个关键部分:
- 水库是水的生产者
- 管道是将水从水库传输到家的流
- 你的家是水的消费者
回到 Node.js 流。让我们比较管道类比与它们的行为:
- 管道不会一次性传输整个水库的水
- 管道以其能处理的特定大小传输水
- 管道对水本身不感兴趣,它只是传输水的一种方式
- 管道只是一种从一个地方到另一个地方传输水的机制
看起来和 Node.js 流很相似,对吧?
什么时候使用 Node.js 流?
在深入了解流的具体细节之前,让我们先理解它们的使用场景。
实时数据处理
当我们处理部分数据或随时间增量生成的数据时,流的效果很好。流对于处理增量生成或随时间分部分接收的数据非常有效。
一个理想的例子是 WebSocket 协议。简而言之,它是一个允许在客户端和服务器之间建立双向通信通道的协议。
我们将在接下来的文章中详细介绍这个协议。我们以 WS 库为例。它大量使用了流。这里有一个例子,其中名为 Sender 的抽象实现了背压机制。
网络交互
每次使用 Node.js API 创建服务器时,你都在创建一个双工流。Node.js 中的 HTTP 模块使用名为 Socket 的抽象来创建与网络套接字的连接。这个 Socket 抽象继承自 Duplex 流。
ObjectSetPrototypeOf(Socket.prototype, stream.Duplex.prototype);
ObjectSetPrototypeOf(Socket, stream.Duplex);
当你看到如下构造时:
import { createServer } from 'http';
const server = createServer();
要知道在底层,你正在创建一个双工流。
处理大型数据集
想象你有一个 100GB 大小的文件。你需要解析它并处理一些数据。你会怎么做?
如果你尝试使用像 readFileSync 或 readFile 这样的 API 来读取文件,你的程序会崩溃。
import { readFileSync, readFile } from 'fs';
const largeFilePath = 'path/to/large/file.txt';
// 这两种方式都会使程序崩溃
const data = readFileSync(largeFilePath);
const asyncData = await readFile(largeFilePath);
问题在于你试图使用这些读取 API 将整个文件内容加载到内存中。这听起来一点都不高效。相反,我们可以分块处理文件内容。
import { createReadStream } from 'fs';
const largeFilePath = 'path/to/large/file.txt';
const stream = createReadStream(largeFilePath);
stream.on('data', (chunk) => {
// 在这里处理数据块
});
使用这种方法,我们不需要等待整个文件加载到内存中。每当一个数据块准备好时,我们就处理它。
数据转换
之前的所有例子都是关于从某处读取数据或将数据写入某处的情况。但我们也可以使用流来转换已经在内存中的数据。
一个很好的例子是数据压缩/解压缩。这里是一个来自 Node.js 文档中 zlib 模块的例子。
async function do_gzip(input, output) {
const gzip = createGzip();
// 创建一个读取流来从输入读取数据
const source = createReadStream(input);
// 创建一个写入流来将数据写入输出
const destination = createWriteStream(output);
// 将源流通过 gzip 流传输到目标流
await pipe(source, gzip, destination);
}
在这段代码中,我们创建了一个读取流,每当数据从这个读取流来时,我们就将它传递给 gzip。当 gzip 流压缩数据时,我们将其传递给写入流。
你现在不需要完全理解这段代码是如何工作的。只需要理解流可以用来转换不同的数据。
不要在这种情况下使用流
当你要处理的数据已经在内存中时,你不需要使用流。在这种情况下使用流几乎没有任何好处。
所以请尽量避免在所有需要的数据已经在内存中时使用流。不要让你的生活变得更困难。
Node.js 流的核心概念
你已经理解了什么是流,何时使用它们,以及何时不使用它们。现在,你准备深入了解 Node.js 流的一些核心概念。
事件驱动架构
你知道流就像管道。但是是什么让它们以这种方式工作呢?这要归功于流所基于的事件驱动概念。特别是,Node.js 中的所有流都继承自 EventEmitter 类。
EventEmitter 的工作方式非常简单。它有一些内部状态,用于存储所有事件及其监听器。
class EventEmitter {
// 事件和它们的监听器的映射
// 每个事件可以有多个监听器
#events = new Map<string, (() => void)[]>();
// 为事件注册新的监听器
on(eventName: string, callback: () => void) {
if (!this.#events.has(eventName)) {
this.#events.set(eventName, [callback]);
}
this.#events.get(eventName).push(callback);
}
// 触发与事件相关的所有监听器
emit(eventName: string) {
const listeners = this.#events.get(eventName);
if (!listeners) {
return;
}
listeners.forEach((listener) => listener());
}
}
这是一个非常简化的版本,但它让你了解 EventEmitter 是如何工作的。你可以在 Node.js 源代码中阅读完整的实现。
当您使用流时,可以为一些预定义的事件集添加监听器。
stream.on('data', () => {});
在这个例子中,我们为数据事件添加了一个监听器。每当一块数据准备好时,流会调用 emit 方法并传递数据事件名称,所有监听器都会被调用。
这正是使流像管道一样工作的机制,我们从一端获取数据,并将其传递到另一端。
背压机制 (Backpressure)
back-pressure这个词是来源于工程概念,当气流或液体在管道中运输时,由于管道变细或者受到其他阻碍,导致出现了下游向上游的逆向压力,这种情况就称为back pressure,也称作向后的压力。
在计算机行业,back pressure 通常用来描述当数据在传输中由于下层的buffer满了,导致上层服务无法继续接收数据的现象。
流可以用来高效地处理大型数据集。但是有一个问题:如果数据生产的速率太高,以至于在某个时间点,我们程序中的数据超过了分配的内存容量怎么办?没错,程序会崩溃。
这意味着仅仅流的抽象概念还不足以防止这种情况发生。流为这种情况提供了背压机制。
背压听起来可能很高深,但实际上很简单。背压的主要思想是我们对一次可以处理多少数据有一定限制。
让我们回到读取大文件的例子。这个过程有两个我们感兴趣的部分:数据的生产者和数据的消费者。数据的生产者是底层操作系统机制,它读取文件并生成数据。
如果生产者试图推送过多的数据,流可以向生产者发出信号,表示它需要减缓速度,因为目前无法接收更多数据。但是流是如何知道它已经满了呢?
每个流都有一个内部缓冲区,随着新数据的到来和旧数据的退出,“缓冲”机制就发挥作用。
缓冲
每个流都有一个内部缓冲区。如果我们使用启用了背压机制的 API,那么这个缓冲区用于存储进入流的数据。
如果数据进入流但没有流出,缓冲区会稳定填充,直到达到上限。在这种情况下,上限是为每个单独的流设置的 highWaterMark
属性。
这里是一个在读取文件时设置 highWaterMark 属性的例子。
import { createReadStream } from 'node:fs';
const filePath = 'path/to/file.txt';
const writeStream = createReadStream(filePath, { highWaterMark: 1024 });
createReadStream 函数的 highWaterMark 默认设置为 64KB。当内部缓冲区释放一些空间时,流可以开始从源读取更多数据。
管道和链式操作
在或多或少复杂的 Node.js 应用程序中,你需要转换来自流的数据或将这些数据发送到其他目的地。在这种情况下,一个称为"管道"的概念就派上用场了。
你可以创建一个流链,其中一个流连接到另一个流,当数据进入链中的第一个流时,它会通过整个流链。如果你熟悉响应式编程和像 RxJS 这样的东西,那么这个概念应该对你来说很熟悉。
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream';
const source = createReadStream('path/to/file.txt');
const destination = createWriteStream('path/to/file.txt.gz');
const gzip = createGzip();
await pipeline(source, gzip, destination);
在这个例子中,source
流触发整个管道。过程如下:
- 源流从文件读取数据
- 源流将数据传递给 gzip 流
- gzip 流压缩数据
- gzip 流将压缩后的数据传递给目标流
- 目标流将压缩后的数据写入文件
- 整个管道完成
管道的每个阶段都有自己的内部缓冲区和背压机制。这意味着如果 gzip 流无法处理来自源流的数据,它可以向源流发出信号要求其放慢速度。目标流也是如此。
结论
无论你是否显式使用它们,流都是任何 Node.js 应用程序的核心。它也是 Node.js 中最强大的功能之一。流在 Node.js 中的许多不同地方都有使用,从网络交互到文件处理。
当你需要处理大型数据集或处理实时数据时,它们特别有用。流的核心思维模型围绕以下概念构建:
- 随时间变化的数据
- 事件驱动架构
- 背压
- 缓冲
- 管道和链式操作
通过理解这些概念并清楚地了解流在概念层面上如何运作,你可以构建更高效的 Node.js 应用程序。