Bun

流是处理二进制数据的重要抽象概念,它允许您在不一次性将所有数据加载到内存中的情况下进行操作。它们常用于读取和写入文件、发送和接收网络请求以及处理大量数据。

Bun 实现了 Web API ReadableStreamWritableStream

Bun 还实现了 node:stream 模块,包括 ReadableWritableDuplex。有关完整文档,请参阅 Node.js 文档

要创建一个简单的 ReadableStream

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

可以使用 for await 语法逐块读取 ReadableStream 的内容。

for await (const chunk of stream) {
  console.log(chunk);
  // => "hello"
  // => "world"
}

直接 ReadableStream

Bun 实现了 ReadableStream 的优化版本,避免了不必要的数据复制和队列管理逻辑。使用传统的 ReadableStream,数据块会被入队。每个数据块都会被复制到一个队列中,并在队列中等待,直到流准备好发送更多数据。

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

使用直接 ReadableStream,数据块直接写入流。不会发生排队,也不需要将块数据克隆到内存中。controller API 已更新以反映这一点;您调用 .write 而不是 .enqueue()

const stream = new ReadableStream({
  type: "direct",
  pull(controller) {
    controller.write("hello");
    controller.write("world");
  },
});

当使用直接 ReadableStream 时,所有块排队都由目标处理。流的消费者接收到的正是传递给 controller.write() 的内容,没有任何编码或修改。

异步生成器流

Bun 还支持将异步生成器函数作为 ResponseRequest 的源。这是一种创建 ReadableStream 的简便方法,该流从异步源获取数据。

const response = new Response(async function* () {
  yield "hello";
  yield "world";
}());

await response.text(); // "helloworld"

您也可以直接使用 [Symbol.asyncIterator]

const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    yield "hello";
    yield "world";
  },
});

await response.text(); // "helloworld"

如果您需要对流进行更精细的控制,yield 将返回直接 ReadableStream 控制器。

const response = new Response({
  [Symbol.asyncIterator]: async function* () {
    const controller = yield "hello";
    await controller.end();
  },
});

await response.text(); // "hello"

Bun.ArrayBufferSink

Bun.ArrayBufferSink 类是一个快速的增量写入器,用于构建未知大小的 ArrayBuffer

const sink = new Bun.ArrayBufferSink();

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// ArrayBuffer(5) [ 104, 101, 108, 108, 111 ]

要改为将数据作为 Uint8Array 检索,请将 asUint8Array 选项传递给 start 方法。

const sink = new Bun.ArrayBufferSink();
sink.start({
  asUint8Array: true
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.write("l");
sink.write("o");

sink.end();
// Uint8Array(5) [ 104, 101, 108, 108, 111 ]

.write() 方法支持字符串、类型化数组、ArrayBufferSharedArrayBuffer

sink.write("h");
sink.write(new Uint8Array([101, 108]));
sink.write(Buffer.from("lo").buffer);

sink.end();

一旦调用 .end(),就不能再向 ArrayBufferSink 写入数据。但是,在缓冲流的上下文中,持续写入数据并定期 .flush() 内容(例如,写入 WriteableStream)非常有用。为了支持这一点,请将 stream: true 传递给构造函数。

const sink = new Bun.ArrayBufferSink();
sink.start({
  stream: true,
});

sink.write("h");
sink.write("e");
sink.write("l");
sink.flush();
// ArrayBuffer(5) [ 104, 101, 108 ]

sink.write("l");
sink.write("o");
sink.flush();
// ArrayBuffer(5) [ 108, 111 ]

.flush() 方法将缓冲的数据作为 ArrayBuffer(或 Uint8Array,如果 asUint8Array: true)返回,并清除内部缓冲区。

要手动设置内部缓冲区的大小(以字节为单位),请传递 highWaterMark 的值

const sink = new Bun.ArrayBufferSink();
sink.start({
  highWaterMark: 1024 * 1024, // 1 MB
});

参考