流是处理二进制数据而不必一次性将所有数据加载到内存中的重要抽象。它们通常用于读写文件、发送和接收网络请求以及处理大量数据。
Bun 实现了 Web API ReadableStream 和 WritableStream。
Bun 还实现了 node:stream 模块,包括 Readable、Writable 和 Duplex。有关完整文档,请参阅 Node.js 文档。
创建一个简单的 ReadableStream
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
ReadableStream 的内容可以通过 for await 语法逐块读取。
for await (const chunk of stream) {
console.log(chunk);
// => "hello"
// => "world"
}
ReadableStream 还提供了便捷方法来消费整个流
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello world");
controller.close();
},
});
const data = await stream.text(); // => "hello world"
// Also available: .json(), .bytes(), .blob()
直接 ReadableStream
Bun 实现了一个优化版的 ReadableStream,它避免了不必要的数据复制和队列管理逻辑。传统的 ReadableStream 中,数据块被_入队_。每个数据块都被复制到队列中,然后等待流准备好发送更多数据。
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
通过直接 ReadableStream,数据块直接写入流中。没有发生排队,也无需将数据块克隆到内存中。controller API 已更新以反映这一点;您调用 .write 而不是 .enqueue()。
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});
使用直接 ReadableStream 时,所有数据块排队都由目标处理。流的消费者接收到的正是传递给 controller.write() 的内容,没有任何编码或修改。
异步生成器流
Bun 还支持异步生成器函数作为 Response 和 Request 的来源。这是一种创建 ReadableStream 以从异步源获取数据的简单方法。
const response = new Response(
(async function* () {
yield "hello";
yield "world";
})(),
);
await response.text(); // "helloworld"
您也可以直接使用 [Symbol.asyncIterator]。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"
如果您需要对流进行更精细的控制,yield 将返回直接 ReadableStream 控制器。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"
Bun.ArrayBufferSink
Bun.ArrayBufferSink 类是一个用于构造未知大小 ArrayBuffer 的快速增量写入器。
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]
要改为将数据检索为 Uint8Array,请将 asUint8Array 选项传递给 start 方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ].write() 方法支持字符串、类型化数组、ArrayBuffer 和 SharedArrayBuffer。
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();
一旦调用 .end(),就不能再向 ArrayBufferSink 写入数据。但是,在缓冲流的上下文中,连续写入数据并定期 .flush() 内容(例如,到 WritableStream 中)很有用。为了支持这一点,请将 stream: true 传递给构造函数。
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]
.flush() 方法将缓冲数据作为 ArrayBuffer(如果 asUint8Array: true 则为 Uint8Array)返回并清除内部缓冲区。
要手动设置内部缓冲区的大小(以字节为单位),请为 highWaterMark 传递一个值
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});
参考