流是处理二进制数据的重要抽象概念,它允许您在不一次性将所有数据加载到内存中的情况下进行操作。它们常用于读取和写入文件、发送和接收网络请求以及处理大量数据。
Bun 实现了 Web API ReadableStream
和 WritableStream
。
Bun 还实现了 node:stream
模块,包括 Readable
、Writable
和 Duplex
。有关完整文档,请参阅 Node.js 文档。
要创建一个简单的 ReadableStream
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
可以使用 for await
语法逐块读取 ReadableStream
的内容。
for await (const chunk of stream) {
console.log(chunk);
// => "hello"
// => "world"
}
直接 ReadableStream
Bun 实现了 ReadableStream
的优化版本,避免了不必要的数据复制和队列管理逻辑。使用传统的 ReadableStream
,数据块会被入队。每个数据块都会被复制到一个队列中,并在队列中等待,直到流准备好发送更多数据。
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
使用直接 ReadableStream
,数据块直接写入流。不会发生排队,也不需要将块数据克隆到内存中。controller
API 已更新以反映这一点;您调用 .write
而不是 .enqueue()
。
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});
当使用直接 ReadableStream
时,所有块排队都由目标处理。流的消费者接收到的正是传递给 controller.write()
的内容,没有任何编码或修改。
异步生成器流
Bun 还支持将异步生成器函数作为 Response
和 Request
的源。这是一种创建 ReadableStream
的简便方法,该流从异步源获取数据。
const response = new Response(async function* () {
yield "hello";
yield "world";
}());
await response.text(); // "helloworld"
您也可以直接使用 [Symbol.asyncIterator]
。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"
如果您需要对流进行更精细的控制,yield
将返回直接 ReadableStream 控制器。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"
Bun.ArrayBufferSink
Bun.ArrayBufferSink
类是一个快速的增量写入器,用于构建未知大小的 ArrayBuffer
。
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]
要改为将数据作为 Uint8Array
检索,请将 asUint8Array
选项传递给 start
方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]
.write()
方法支持字符串、类型化数组、ArrayBuffer
和 SharedArrayBuffer
。
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();
一旦调用 .end()
,就不能再向 ArrayBufferSink
写入数据。但是,在缓冲流的上下文中,持续写入数据并定期 .flush()
内容(例如,写入 WriteableStream
)非常有用。为了支持这一点,请将 stream: true
传递给构造函数。
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]
.flush()
方法将缓冲的数据作为 ArrayBuffer
(或 Uint8Array
,如果 asUint8Array: true
)返回,并清除内部缓冲区。
要手动设置内部缓冲区的大小(以字节为单位),请传递 highWaterMark
的值
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});
参考