Bun

WebSockets

Bun.serve() 支持服务器端 WebSockets,具有即时压缩、TLS 支持和 Bun 原生的发布/订阅 API。

⚡️ 吞吐量提高 7 倍 — Bun 的 WebSockets 非常快速。在 Linux x64 上,对于一个简单的聊天室,Bun 每秒处理的请求数是 Node.js + "ws" 的 7 倍。

每秒发送的消息数运行时客户端
~700,000(Bun.serve) Bun v0.2.1 (x64)16
~100,000(ws) Node v18.10.0 (x64)16

Bun 的 WebSocket 实现内部基于 uWebSockets 构建。

启动 WebSocket 服务器

下面是用 Bun.serve 构建的一个简单的 WebSocket 服务器,其中所有传入的请求都在 fetch 处理程序中升级为 WebSocket 连接。套接字处理程序在 websocket 参数中声明。

Bun.serve({
  fetch(req, server) {
    // upgrade the request to a WebSocket
    if (server.upgrade(req)) {
      return; // do not return a Response
    }
    return new Response("Upgrade failed", { status: 500 });
  },
  websocket: {}, // handlers
});

支持以下 WebSocket 事件处理程序

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    message(ws, message) {}, // a message is received
    open(ws) {}, // a socket is opened
    close(ws, code, message) {}, // a socket is closed
    drain(ws) {}, // the socket is ready to receive more data
  },
});

专为速度而设计的 API

每个处理程序的第一个参数是处理该事件的 ServerWebSocket 实例。ServerWebSocket 类是 WebSocket 的一个快速、Bun 原生的实现,并包含一些附加功能。

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    message(ws, message) {
      ws.send(message); // echo back the message
    },
  },
});

发送消息

每个 ServerWebSocket 实例都有一个 .send() 方法,用于将消息发送到客户端。它支持多种输入类型。

ws.send("Hello world"); // string
ws.send(response.arrayBuffer()); // ArrayBuffer
ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView

标头

一旦升级成功,Bun 将按照 规范发送一个 101 Switching Protocols 响应。可以在 server.upgrade() 调用中将其他 headers 附加到此 Response

Bun.serve({
  fetch(req, server) {
    const sessionId = await generateSessionId();
    server.upgrade(req, {
      headers: {
        "Set-Cookie": `SessionId=${sessionId}`,
      },
    });
  },
  websocket: {}, // handlers
});

上下文数据

上下文 data 可以通过 .upgrade() 调用附加到新的 WebSocket。这些数据在 WebSocket 处理程序中的 ws.data 属性中可用。

要为 ws.data 进行强类型设置,请向 websocket 处理程序对象添加一个 data 属性。这将为所有生命周期钩子中的 ws.data 进行类型设置。

type WebSocketData = {
  createdAt: number;
  channelId: string;
  authToken: string;
};

Bun.serve({
  fetch(req, server) {
    const cookies = new Bun.CookieMap(req.headers.get("cookie")!);

    server.upgrade(req, {
      // this object must conform to WebSocketData
      data: {
        createdAt: Date.now(),
        channelId: new URL(req.url).searchParams.get("channelId"),
        authToken: cookies.get("X-Token"),
      },
    });

    return undefined;
  },
  websocket: {
    // TypeScript: specify the type of ws.data like this
    data: {} as WebSocketData,

    // handler called when a message is received
    async message(ws, message) {
      // ws.data is now properly typed as WebSocketData
      const user = getUserFromToken(ws.data.authToken);

      await saveMessageToDatabase({
        channel: ws.data.channelId,
        message: String(message),
        userId: user.id,
      });
    },
  },
});

注意:以前,您可以使用 Bun.serve 上的类型参数来指定 ws.data 的类型,例如 Bun.serve<MyData>({...})。由于 TypeScript 的限制,此模式已删除,转而使用上面显示的 data 属性。

要从浏览器连接到此服务器,请创建一个新的 WebSocket

browser.js
const socket = new WebSocket("ws://:3000/chat");

socket.addEventListener("message", event => {
  console.log(event.data);
})

识别用户 — 当前设置在页面上的 cookie 将随 WebSocket 升级请求一起发送,并在 fetch 处理程序中的 req.headers 中可用。解析这些 cookie 以确定连接用户的身份,并相应地设置 data 的值。

发布/订阅

Bun 的 ServerWebSocket 实现了一个原生的发布/订阅 API,用于基于主题的广播。单个套接字可以 .subscribe() 到一个主题(用字符串标识符指定),并 .publish() 消息到该主题的所有其他订阅者(不包括自身)。这个基于主题的广播 API 类似于 MQTTRedis Pub/Sub

const server = Bun.serve({
  fetch(req, server) {
    const url = new URL(req.url);
    if (url.pathname === "/chat") {
      console.log(`upgrade!`);
      const username = getUsernameFromReq(req);
      const success = server.upgrade(req, { data: { username } });
      return success
        ? undefined
        : new Response("WebSocket upgrade error", { status: 400 });
    }

    return new Response("Hello world");
  },
  websocket: {
    // TypeScript: specify the type of ws.data like this
    data: {} as { username: string },

    open(ws) {
      const msg = `${ws.data.username} has entered the chat`;
      ws.subscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
    message(ws, message) {
      // this is a group chat
      // so the server re-broadcasts incoming message to everyone
      server.publish("the-group-chat", `${ws.data.username}: ${message}`);
    },
    close(ws) {
      const msg = `${ws.data.username} has left the chat`;
      ws.unsubscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
  },
});

console.log(`Listening on ${server.hostname}:${server.port}`);

调用 .publish(data) 会将消息发送给主题的所有订阅者,除了调用 .publish() 的套接字。要将消息发送给主题的所有订阅者,请使用 Server 实例上的 .publish() 方法。

const server = Bun.serve({
  websocket: {
    // ...
  },
});

// listen for some external event
server.publish("the-group-chat", "Hello world");

压缩

可以通过 perMessageDeflate 参数启用每个消息的压缩

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    // enable compression and decompression
    perMessageDeflate: true,
  },
});

通过将 boolean 作为第二个参数传递给 .send(),可以为单个消息启用压缩。

ws.send("Hello world", true);

有关压缩特性的精细控制,请参阅 参考

背压

ServerWebSocket.send(message) 方法返回一个 number,表示操作的结果。

  • -1 — 消息已入队,但存在背压
  • 0 — 由于连接问题,消息已被丢弃
  • 1+ — 发送的字节数

这为您在服务器中更好地控制背压提供了帮助。

超时和限制

默认情况下,如果 WebSocket 连接空闲 120 秒,Bun 将关闭它。这可以通过 idleTimeout 参数进行配置。

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    idleTimeout: 60, // 60 seconds

    // ...
  },
});

如果收到大于 16 MB 的消息,Bun 也会关闭 WebSocket 连接。这可以通过 maxPayloadLength 参数进行配置。

Bun.serve({
  fetch(req, server) {}, // upgrade logic
  websocket: {
    maxPayloadLength: 1024 * 1024, // 1 MB

    // ...
  },
});

连接到 Websocket 服务器

Bun 实现 WebSocket 类。要创建连接到 ws://wss:// 服务器的 WebSocket 客户端,请创建一个 WebSocket 实例,就像在浏览器中一样。

const socket = new WebSocket("ws://:3000");

// With subprotocol negotiation
const socket2 = new WebSocket("ws://:3000", ["soap", "wamp"]);

在浏览器中,当前设置在页面上的 cookie 将随 WebSocket 升级请求一起发送。这是 WebSocket API 的标准功能。

为了方便起见,Bun 允许您直接在构造函数中设置自定义标头。这是 WebSocket 标准的 Bun 特定扩展。这在浏览器中不起作用。

const socket = new WebSocket("ws://:3000", {
  headers: {
    // custom headers
  },
});

客户端压缩

WebSocket 客户端支持 permessage-deflate 压缩。extensions 属性显示已协商的压缩。

const socket = new WebSocket("wss://echo.websocket.org");
socket.addEventListener("open", () => {
  console.log(socket.extensions); // => "permessage-deflate"
});

要为套接字添加事件监听器

// message is received
socket.addEventListener("message", event => {});

// socket opened
socket.addEventListener("open", event => {});

// socket closed
socket.addEventListener("close", event => {});

// error handler
socket.addEventListener("error", event => {});

Reference

namespace Bun {
  export function serve(params: {
    fetch: (req: Request, server: Server) => Response | Promise<Response>;
    websocket?: {
      message: (
        ws: ServerWebSocket,
        message: string | ArrayBuffer | Uint8Array,
      ) => void;
      open?: (ws: ServerWebSocket) => void;
      close?: (ws: ServerWebSocket, code: number, reason: string) => void;
      error?: (ws: ServerWebSocket, error: Error) => void;
      drain?: (ws: ServerWebSocket) => void;

      maxPayloadLength?: number; // default: 16 * 1024 * 1024 = 16 MB
      idleTimeout?: number; // default: 120 (seconds)
      backpressureLimit?: number; // default: 1024 * 1024 = 1 MB
      closeOnBackpressureLimit?: boolean; // default: false
      sendPings?: boolean; // default: true
      publishToSelf?: boolean; // default: false

      perMessageDeflate?:
        | boolean
        | {
            compress?: boolean | Compressor;
            decompress?: boolean | Compressor;
          };
    };
  }): Server;
}

type Compressor =
  | `"disable"`
  | `"shared"`
  | `"dedicated"`
  | `"3KB"`
  | `"4KB"`
  | `"8KB"`
  | `"16KB"`
  | `"32KB"`
  | `"64KB"`
  | `"128KB"`
  | `"256KB"`;

interface Server {
  pendingWebSockets: number;
  publish(
    topic: string,
    data: string | ArrayBufferView | ArrayBuffer,
    compress?: boolean,
  ): number;
  upgrade(
    req: Request,
    options?: {
      headers?: HeadersInit;
      data?: any;
    },
  ): boolean;
}

interface ServerWebSocket {
  readonly data: any;
  readonly readyState: number;
  readonly remoteAddress: string;
  send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
  close(code?: number, reason?: string): void;
  subscribe(topic: string): void;
  unsubscribe(topic: string): void;
  publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
  isSubscribed(topic: string): boolean;
  cork(cb: (ws: ServerWebSocket) => void): void;
}