WebSocketsWebSockets

WebSockets

在掌握了服务器发送事件的单向通信之后,蝙蝠侠意识到他需要更强大的功能。当戈登局长想要在危机情况下与他实时聊天时,蝙蝠侠需要双向通信。

"SSE 很适合向我的仪表板推送更新,"蝙蝠侠想道,"但我需要双向通信来与我的盟友协调!"

为了实现双向实时通信,蝙蝠侠学习了如何使用 Robyn 的现代装饰器 API 处理 WebSocket。底层消息通过 Rust 通道传递,实现最大性能——消息分发过程中无需 Python GIL。

Request

WebSocket
/web_socket
from robyn import Robyn

app = Robyn(__file__)

@app.websocket("/web_socket")
async def handler(websocket):
    while True:
        msg = await websocket.receive_text()
        await websocket.send_text(f"Echo: {msg}")

app.start()

接收消息

receive_text() 方法会阻塞直到下一条消息到达。它由 Rust 的 tokio::mpsc 通道支持,因此 Python 处理程序在等待时不会持有 GIL。

当客户端断开连接时,receive_text() 会抛出 WebSocketDisconnect 异常。您可以显式捕获它,也可以让内部包装器静默处理。

接收消息

WebSocket
/web_socket
@app.websocket("/ws")
async def handler(websocket):
    try:
        while True:
            msg = await websocket.receive_text()
            await websocket.send_text(f"收到: {msg}")
    except WebSocketDisconnect:
        print(f"客户端 {websocket.id} 已断开")

发送消息

使用 send_text()send_json() 向当前客户端发送消息。所有发送方法都是异步的。

发送消息

WebSocket
/web_socket
@app.websocket("/ws")
async def handler(websocket):
    while True:
        msg = await websocket.receive_text()
        await websocket.send_text(f"Echo: {msg}")

广播

使用 broadcast() 方法向同一 WebSocket 端点上的所有已连接客户端发送消息。

广播

WebSocket
/chat
@app.websocket("/chat")
async def handler(websocket):
    while True:
        msg = await websocket.receive_text()
        # 向所有已连接的客户端发送
        await websocket.broadcast(f"用户 {websocket.id}: {msg}")
        # 仅向当前客户端发送确认
        await websocket.send_text("您的消息已发送")

查询参数

通过 websocket.query_params 访问 WebSocket 连接 URL 中的查询参数。

查询参数

WebSocket
/ws?name=gordon&role=commissioner
@app.websocket("/ws")
async def handler(websocket):
    name = websocket.query_params.get("name")
    role = websocket.query_params.get("role")

    if name == "gordon" and role == "commissioner":
        await websocket.broadcast("戈登已授权!")

    while True:
        msg = await websocket.receive_text()
        await websocket.send_text(f"你好 {name}: {msg}")

便捷查询参数访问

除了手动调用 websocket.query_params.get(...) 之外,你还可以在处理函数、on_connecton_close 的签名中直接声明带类型注解的查询参数。Robyn 会自动解析并进行类型转换——与 HTTP 便捷参数访问的用法一致。

带默认值的参数是可选的。没有默认值的参数是必需的——如果缺少,连接将被拒绝并返回错误消息。

便捷查询参数

WebSocket
/ws?room=chat&page=5
@app.websocket("/ws")
async def handler(websocket, room: str = "default", page: int = 1):
    try:
        while True:
            msg = await websocket.receive_text()
            await websocket.send_text(
                f"room={room} page={page} msg={msg}"
            )
    except WebSocketDisconnect:
        pass

关闭连接

使用 websocket.close() 从服务端关闭 WebSocket 连接。该方法将:

  1. 关闭 WebSocket 连接。
  2. 从 WebSocket 注册表中移除客户端。
  3. 使任何挂起的 receive_text() 抛出 WebSocketDisconnect 异常。

关闭连接

WebSocket
/ws
@app.websocket("/ws")
async def handler(websocket):
    while True:
        msg = await websocket.receive_text()
        if msg == "quit":
            await websocket.close()
            break
        await websocket.send_text(f"收到: {msg}")

连接和关闭回调

您可以为 WebSocket 处理程序附加可选的 on_connecton_close 回调。它们是处理函数本身的装饰器。

  • on_connect 在新客户端连接时调用。其返回值作为第一条消息发送给客户端。
  • on_close 在连接关闭时调用。其返回值作为最后一条消息发送给客户端。

两个回调都接收一个 websocket 对象,可以访问 idquery_params。两者都是可选的。

回调

WebSocket
/chat
@app.websocket("/chat")
async def chat(websocket):
    while True:
        msg = await websocket.receive_text()
        await websocket.broadcast(msg)

@chat.on_connect
def on_connect(websocket):
    return f"欢迎,{websocket.id}!"

@chat.on_close
def on_close(websocket):
    return "再见!"

WebSocket API 参考

传递给处理程序的 websocket 对象提供以下方法和属性:

方法 / 属性描述

| await websocket.receive_text() | 阻塞直到下一条消息;连接关闭时抛出 WebSocketDisconnect | | await websocket.receive_bytes() | 阻塞直到下一条二进制消息;连接关闭时抛出 WebSocketDisconnect | | await websocket.receive_json() | 与 receive_text() 相同,但返回 JSON 解码后的数据 | | await websocket.send_text(data) | 向当前客户端发送文本 | | await websocket.send_bytes(data) | 向当前客户端发送二进制数据 | | await websocket.send_json(data) | 向当前客户端发送 JSON | | await websocket.broadcast(data) | 向此端点的所有客户端广播 | | await websocket.close() | 从服务端关闭连接 | | websocket.id | 连接 UUID 字符串 | | websocket.query_params | 连接 URL 中的查询参数 |

下一步

随着代码库的扩展,蝙蝠侠希望正义联盟的成员能够参与管理应用程序。

Robyn 向他介绍了应用扩展的最佳实践,并展示了如何通过视图和子路由器来提升代码的可读性和可维护性。