Bun.serve()
支持服务器端 WebSockets,具有即时压缩、TLS 支持和 Bun 原生的发布订阅 API。
⚡️ 7 倍更高的吞吐量 — Bun 的 WebSockets 速度很快。对于 Linux x64 上的 简单聊天室,Bun 每秒可以处理比 Node.js + "ws"
多 7 倍的请求。
每秒发送的消息 | 运行时 | 客户端 |
---|---|---|
~700,000 | (Bun.serve ) Bun v0.2.1 (x64) | 16 |
~100,000 | (ws ) Node v18.10.0 (x64) | 16 |
Bun 的 WebSocket 实现内部构建在 uWebSockets 之上。
启动 WebSocket 服务器
下面是一个使用 Bun.serve
构建的简单 WebSocket 服务器,其中所有传入请求都在 fetch
处理程序中升级到 WebSocket 连接。套接字处理程序在 websocket
参数中声明。
Bun.serve({
fetch(req, server) {
// upgrade the request to a WebSocket
if (server.upgrade(req)) {
return; // do not return a Response
}
return new Response("Upgrade failed :(", { status: 500 });
},
websocket: {}, // handlers
});
支持以下 WebSocket 事件处理程序
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
message(ws, message) {}, // a message is received
open(ws) {}, // a socket is opened
close(ws, code, message) {}, // a socket is closed
drain(ws) {}, // the socket is ready to receive more data
},
});
专为速度设计的 API
每个处理程序的第一个参数是处理事件的 ServerWebSocket
实例。ServerWebSocket
类是 WebSocket
的快速、Bun 原生实现,并具有一些附加功能。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
message(ws, message) {
ws.send(message); // echo back the message
},
},
});
发送消息
每个 ServerWebSocket
实例都有一个 .send()
方法,用于向客户端发送消息。它支持多种输入类型。
ws.send("Hello world"); // string
ws.send(response.arrayBuffer()); // ArrayBuffer
ws.send(new Uint8Array([1, 2, 3])); // TypedArray | DataView
标头
升级成功后,Bun 会根据 规范 发送 101 Switching Protocols
响应。可以在 server.upgrade()
调用中将其他 headers
附加到此 Response
。
Bun.serve({
fetch(req, server) {
const sessionId = await generateSessionId();
server.upgrade(req, {
headers: {
"Set-Cookie": `SessionId=${sessionId}`,
},
});
},
websocket: {}, // handlers
});
上下文数据
可以在 .upgrade()
调用中将上下文 data
附加到新的 WebSocket。此数据在 WebSocket 处理程序内的 ws.data
属性中提供。
type WebSocketData = {
createdAt: number;
channelId: string;
authToken: string;
};
// TypeScript: specify the type of `data`
Bun.serve<WebSocketData>({
fetch(req, server) {
// use a library to parse cookies
const cookies = parseCookies(req.headers.get("Cookie"));
server.upgrade(req, {
// this object must conform to WebSocketData
data: {
createdAt: Date.now(),
channelId: new URL(req.url).searchParams.get("channelId"),
authToken: cookies["X-Token"],
},
});
return undefined;
},
websocket: {
// handler called when a message is received
async message(ws, message) {
const user = getUserFromToken(ws.data.authToken);
await saveMessageToDatabase({
channel: ws.data.channelId,
message: String(message),
userId: user.id,
});
},
},
});
要从浏览器连接到此服务器,请创建一个新的 WebSocket
。
const socket = new WebSocket("ws://localhost:3000/chat");
socket.addEventListener("message", event => {
console.log(event.data);
})
识别用户 — 当前在页面上设置的 cookie 将随 WebSocket 升级请求一起发送,并在 fetch
处理程序中的 req.headers
中提供。解析这些 cookie 以确定连接用户的身份,并相应设置 data
的值。
发布/订阅
Bun 的 ServerWebSocket
实现为基于主题的广播实现了一个本机发布-订阅 API。各个套接字可以 .subscribe()
到一个主题(使用字符串标识符指定),并向该主题的所有其他订阅者(不包括它自己).publish()
消息。此基于主题的广播 API 类似于 MQTT 和 Redis Pub/Sub。
const server = Bun.serve<{ username: string }>({
fetch(req, server) {
const url = new URL(req.url);
if (url.pathname === "/chat") {
console.log(`upgrade!`);
const username = getUsernameFromReq(req);
const success = server.upgrade(req, { data: { username } });
return success
? undefined
: new Response("WebSocket upgrade error", { status: 400 });
}
return new Response("Hello world");
},
websocket: {
open(ws) {
const msg = `${ws.data.username} has entered the chat`;
ws.subscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
message(ws, message) {
// this is a group chat
// so the server re-broadcasts incoming message to everyone
server.publish("the-group-chat", `${ws.data.username}: ${message}`);
},
close(ws) {
const msg = `${ws.data.username} has left the chat`;
ws.unsubscribe("the-group-chat");
server.publish("the-group-chat", msg);
},
},
});
console.log(`Listening on ${server.hostname}:${server.port}`);
调用 .publish(data)
会将消息发送给主题的所有订阅者,除了 调用 .publish()
的套接字。要向主题的所有订阅者发送消息,请使用 Server
实例上的 .publish()
方法。
const server = Bun.serve({
websocket: {
// ...
},
});
// listen for some external event
server.publish("the-group-chat", "Hello world");
压缩
可以使用 perMessageDeflate
参数启用逐消息 压缩。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
// enable compression and decompression
perMessageDeflate: true,
},
});
可以通过将 boolean
作为第二个参数传递给 .send()
来为各个消息启用压缩。
ws.send("Hello world", true);
有关压缩特性的细粒度控制,请参阅 参考。
反压
ServerWebSocket
的 .send(message)
方法返回一个 number
,表示操作的结果。
-1
— 消息已排队,但有反压0
— 由于连接问题,消息已丢失1+
— 已发送的字节数
这让你可以更好地控制服务器中的反压。
超时和限制
默认情况下,如果 WebSocket 连接空闲 120 秒,Bun 将关闭该连接。这可以通过 idleTimeout
参数进行配置。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
idleTimeout: 60, // 60 seconds
// ...
},
});
如果 Bun 收到大于 16 MB 的消息,它也将关闭 WebSocket 连接。这可以通过 maxPayloadLength
参数进行配置。
Bun.serve({
fetch(req, server) {}, // upgrade logic
websocket: {
maxPayloadLength: 1024 * 1024, // 1 MB
// ...
},
});
连接到 Websocket
服务器
Bun 实现了 WebSocket
类。若要创建一个连接到 ws://
或 wss://
服务器的 WebSocket 客户端,请创建一个 WebSocket
实例,就像在浏览器中一样。
const socket = new WebSocket("ws://localhost:3000");
在浏览器中,当前在页面上设置的 cookie 将随 WebSocket 升级请求一起发送。这是 WebSocket
API 的一项标准功能。
为了方便,Bun 允许你在构造函数中直接设置自定义标头。这是 WebSocket
标准的 Bun 特有扩展。这在浏览器中不起作用。
const socket = new WebSocket("ws://localhost:3000", {
headers: {
// custom headers
},
});
若要向套接字添加事件侦听器
// message is received
socket.addEventListener("message", event => {});
// socket opened
socket.addEventListener("open", event => {});
// socket closed
socket.addEventListener("close", event => {});
// error handler
socket.addEventListener("error", event => {});
参考
namespace Bun {
export function serve(params: {
fetch: (req: Request, server: Server) => Response | Promise<Response>;
websocket?: {
message: (
ws: ServerWebSocket,
message: string | ArrayBuffer | Uint8Array,
) => void;
open?: (ws: ServerWebSocket) => void;
close?: (ws: ServerWebSocket) => void;
error?: (ws: ServerWebSocket, error: Error) => void;
drain?: (ws: ServerWebSocket) => void;
maxPayloadLength?: number; // default: 16 * 1024 * 1024 = 16 MB
idleTimeout?: number; // default: 120 (seconds)
backpressureLimit?: number; // default: 1024 * 1024 = 1 MB
closeOnBackpressureLimit?: boolean; // default: false
sendPings?: boolean; // default: true
publishToSelf?: boolean; // default: false
perMessageDeflate?:
| boolean
| {
compress?: boolean | Compressor;
decompress?: boolean | Compressor;
};
};
}): Server;
}
type Compressor =
| `"disable"`
| `"shared"`
| `"dedicated"`
| `"3KB"`
| `"4KB"`
| `"8KB"`
| `"16KB"`
| `"32KB"`
| `"64KB"`
| `"128KB"`
| `"256KB"`;
interface Server {
pendingWebsockets: number;
publish(
topic: string,
data: string | ArrayBufferView | ArrayBuffer,
compress?: boolean,
): number;
upgrade(
req: Request,
options?: {
headers?: HeadersInit;
data?: any;
},
): boolean;
}
interface ServerWebSocket {
readonly data: any;
readonly readyState: number;
readonly remoteAddress: string;
send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
close(code?: number, reason?: string): void;
subscribe(topic: string): void;
unsubscribe(topic: string): void;
publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
isSubscribed(topic: string): boolean;
cork(cb: (ws: ServerWebSocket) => void): void;
}