WebSockets
在掌握了服务器发送事件的单向通信之后,蝙蝠侠意识到他需要更强大的功能。当戈登局长想要在危机情况下与他实时聊天时,蝙蝠侠需要双向通信。
"SSE 很适合向我的仪表板推送更新,"蝙蝠侠想道,"但我需要双向通信来与我的盟友协调!"
为了实现双向实时通信,蝙蝠侠学习了如何使用 Robyn 的现代装饰器 API 处理 WebSocket。底层消息通过 Rust 通道传递,实现最大性能——消息分发过程中无需 Python GIL。
Request
from robyn import Robyn
app = Robyn(__file__)
@app.websocket("/web_socket")
async def handler(websocket):
while True:
msg = await websocket.receive_text()
await websocket.send_text(f"Echo: {msg}")
app.start()
接收消息
receive_text() 方法会阻塞直到下一条消息到达。它由 Rust 的 tokio::mpsc 通道支持,因此 Python 处理程序在等待时不会持有 GIL。
当客户端断开连接时,receive_text() 会抛出 WebSocketDisconnect 异常。您可以显式捕获它,也可以让内部包装器静默处理。
接收消息
@app.websocket("/ws")
async def handler(websocket):
try:
while True:
msg = await websocket.receive_text()
await websocket.send_text(f"收到: {msg}")
except WebSocketDisconnect:
print(f"客户端 {websocket.id} 已断开")
发送消息
使用 send_text() 或 send_json() 向当前客户端发送消息。所有发送方法都是异步的。
发送消息
@app.websocket("/ws")
async def handler(websocket):
while True:
msg = await websocket.receive_text()
await websocket.send_text(f"Echo: {msg}")
广播
使用 broadcast() 方法向同一 WebSocket 端点上的所有已连接客户端发送消息。
广播
@app.websocket("/chat")
async def handler(websocket):
while True:
msg = await websocket.receive_text()
# 向所有已连接的客户端发送
await websocket.broadcast(f"用户 {websocket.id}: {msg}")
# 仅向当前客户端发送确认
await websocket.send_text("您的消息已发送")
查询参数
通过 websocket.query_params 访问 WebSocket 连接 URL 中的查询参数。
查询参数
@app.websocket("/ws")
async def handler(websocket):
name = websocket.query_params.get("name")
role = websocket.query_params.get("role")
if name == "gordon" and role == "commissioner":
await websocket.broadcast("戈登已授权!")
while True:
msg = await websocket.receive_text()
await websocket.send_text(f"你好 {name}: {msg}")
便捷查询参数访问
除了手动调用 websocket.query_params.get(...) 之外,你还可以在处理函数、on_connect 和 on_close 的签名中直接声明带类型注解的查询参数。Robyn 会自动解析并进行类型转换——与 HTTP 便捷参数访问的用法一致。
带默认值的参数是可选的。没有默认值的参数是必需的——如果缺少,连接将被拒绝并返回错误消息。
便捷查询参数
@app.websocket("/ws")
async def handler(websocket, room: str = "default", page: int = 1):
try:
while True:
msg = await websocket.receive_text()
await websocket.send_text(
f"room={room} page={page} msg={msg}"
)
except WebSocketDisconnect:
pass
关闭连接
使用 websocket.close() 从服务端关闭 WebSocket 连接。该方法将:
- 关闭 WebSocket 连接。
- 从 WebSocket 注册表中移除客户端。
- 使任何挂起的
receive_text()抛出WebSocketDisconnect异常。
关闭连接
@app.websocket("/ws")
async def handler(websocket):
while True:
msg = await websocket.receive_text()
if msg == "quit":
await websocket.close()
break
await websocket.send_text(f"收到: {msg}")
连接和关闭回调
您可以为 WebSocket 处理程序附加可选的 on_connect 和 on_close 回调。它们是处理函数本身的装饰器。
on_connect在新客户端连接时调用。其返回值作为第一条消息发送给客户端。on_close在连接关闭时调用。其返回值作为最后一条消息发送给客户端。
两个回调都接收一个 websocket 对象,可以访问 id 和 query_params。两者都是可选的。
回调
@app.websocket("/chat")
async def chat(websocket):
while True:
msg = await websocket.receive_text()
await websocket.broadcast(msg)
@chat.on_connect
def on_connect(websocket):
return f"欢迎,{websocket.id}!"
@chat.on_close
def on_close(websocket):
return "再见!"
WebSocket API 参考
传递给处理程序的 websocket 对象提供以下方法和属性:
| 方法 / 属性 | 描述 |
|---|
| await websocket.receive_text() | 阻塞直到下一条消息;连接关闭时抛出 WebSocketDisconnect |
| await websocket.receive_bytes() | 阻塞直到下一条二进制消息;连接关闭时抛出 WebSocketDisconnect |
| await websocket.receive_json() | 与 receive_text() 相同,但返回 JSON 解码后的数据 |
| await websocket.send_text(data) | 向当前客户端发送文本 |
| await websocket.send_bytes(data) | 向当前客户端发送二进制数据 |
| await websocket.send_json(data) | 向当前客户端发送 JSON |
| await websocket.broadcast(data) | 向此端点的所有客户端广播 |
| await websocket.close() | 从服务端关闭连接 |
| websocket.id | 连接 UUID 字符串 |
| websocket.query_params | 连接 URL 中的查询参数 |
下一步
随着代码库的扩展,蝙蝠侠希望正义联盟的成员能够参与管理应用程序。
Robyn 向他介绍了应用扩展的最佳实践,并展示了如何通过视图和子路由器来提升代码的可读性和可维护性。
