流是一种重要的抽象,用于处理二进制数据,而无需一次性将其全部加载到内存中。它们通常用于读写文件、发送和接收网络请求以及处理大量数据。
Bun 实现 Web API ReadableStream
和 WritableStream
。
Bun 还实现了 node:stream
模块,包括 Readable
、Writable
和 Duplex
。有关完整文档,请参阅 Node.js 文档。
要创建一个简单的 ReadableStream
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
可以使用 for await
语法逐块读取 ReadableStream
的内容。
for await (const chunk of stream) {
console.log(chunk);
// => "hello"
// => "world"
}
直接 ReadableStream
Bun 实现了一个经过优化的 ReadableStream
版本,避免了不必要的 data 复制和队列管理逻辑。使用传统的 ReadableStream
,data 块会入队。每个块都会复制到一个队列中,该队列会一直保留,直到流准备好发送更多数据。
const stream = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
使用直接 ReadableStream
时,数据块会直接写入流。不会发生排队,也不需要将块数据克隆到内存中。controller
API 已更新以反映这一点;您调用 .write
,而不是 .enqueue()
。
const stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hello");
controller.write("world");
},
});
使用直接 ReadableStream
时,所有块排队都由目标处理。流的使用者会准确收到传递给 controller.write()
的内容,没有任何编码或修改。
异步生成器流
Bun 还支持异步生成器函数作为 Response
和 Request
的源。这是一种创建从异步源获取数据的 ReadableStream
的简单方法。
const response = new Response(async function* () {
yield "hello";
yield "world";
}());
await response.text(); // "helloworld"
您还可以直接使用 [Symbol.asyncIterator]
。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
yield "hello";
yield "world";
},
});
await response.text(); // "helloworld"
如果您需要对流进行更精细的控制,yield
将返回直接 ReadableStream 控制器。
const response = new Response({
[Symbol.asyncIterator]: async function* () {
const controller = yield "hello";
await controller.end();
},
});
await response.text(); // "hello"
Bun.ArrayBufferSink
Bun.ArrayBufferSink
类是一个快速增量写入器,用于构建大小未知的 ArrayBuffer
。
const sink = new Bun.ArrayBufferSink();
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]
要以 Uint8Array
的形式检索数据,请将 asUint8Array
选项传递给 start
方法。
const sink = new Bun.ArrayBufferSink();
sink.start({
asUint8Array: true
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");
sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]
.write()
方法支持字符串、类型化数组、ArrayBuffer
和 SharedArrayBuffer
。
sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);
sink.end();
一旦调用 .end()
,就不能再向 ArrayBufferSink
写入更多数据。但是,在缓冲流的上下文中,连续写入数据并定期 .flush()
内容(例如,写入 WriteableStream
)很有用。要支持此功能,请将 stream: true
传递给构造函数。
const sink = new Bun.ArrayBufferSink();
sink.start({
stream: true,
});
sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]
sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]
.flush()
方法将缓冲数据作为 ArrayBuffer
(如果 asUint8Array: true
,则为 Uint8Array
)返回,并清除内部缓冲区。
要手动设置内部缓冲区的字节大小,请传递 highWaterMark
的值
const sink = new Bun.ArrayBufferSink();
sink.start({
highWaterMark: 1024 * 1024, // 1 MB
});
参考