速查表
一份常见 Robyn 任务的快速复制粘贴参考。所有代码片段都基于当前发行版。如需了解任一主题的完整内容,请参阅左侧导航栏中的链接。
安装并运行一个最小应用
app.py
# pip install robyn
from robyn import Robyn
app = Robyn(__file__)
@app.get("/")
def index():
return "Hello, world"
app.start(host="0.0.0.0", port=8080) # host 可选,默认为 127.0.0.1
各种 HTTP 方法的路由
@app.get("/items")
def list_items(): ...
@app.post("/items")
def create_item(): ...
@app.put("/items/:id")
def replace_item(): ...
@app.patch("/items/:id")
def update_item(): ...
@app.delete("/items/:id")
def delete_item(): ...
# 还提供:@app.head、@app.options、@app.connect、@app.trace
路径参数(单个、可选、通配)
@app.get("/users/:id")
def get_user(request):
return {"id": request.path_params["id"]}
@app.get("/posts/:id/:slug?") # 可选的末尾段
def get_post(request):
return request.path_params.get("slug", "")
@app.get("/files/*path") # 通配:匹配路径的其余部分
def read_file(request):
return {"path": request.path_params["path"]} # 例如 "img/2024/logo.png"
查询参数
@app.get("/search")
def search(request):
q = request.query_params.get("q", "") # 最后一个值,带默认值
tags = request.query_params.get_all("tags") # 所有值 -> list[str]
return {"q": q, "tags": tags}
读取请求数据
@app.post("/inspect")
def inspect(request):
data = request.json() # 解析后的 JSON 请求体 -> dict / list
raw = request.body # 原始请求体(str | bytes)
form = request.form_data # dict[str, str]
files = request.files # dict[str, bytes]
content_type = request.headers.get("Content-Type")
return {
"method": request.method,
"path": request.url.path,
"ip": request.ip_addr,
}
返回响应
from robyn import Response, status_codes
@app.get("/text")
def text():
return "plain text"
@app.get("/json")
def json_body():
return {"key": "value"} # dict / list 会被自动序列化为 JSON
@app.get("/custom")
def custom():
return Response(
status_code=status_codes.HTTP_201_CREATED,
headers={"Content-Type": "text/plain"},
body="created", # 也接受 `description=`(旧版别名)
)
重定向
from robyn import Response
@app.get("/old")
def old():
return Response(status_code=307, headers={"Location": "/new"}, body="")
设置 Cookie
from robyn import Response, Headers
@app.get("/login")
def login():
response = Response(status_code=200, headers=Headers({}), body="ok")
response.set_cookie(
key="session", value="abc123", max_age=3600,
path="/", http_only=True, secure=True, same_site="Strict",
)
return response
中间件(请求前 / 请求后)
@app.before_request() # 全局;传入路径即为按路由生效
def add_trace(request):
request.headers.set("x-trace", "1")
return request
@app.after_request("/") # 按路由;可接收 (request, response)
def stamp(request, response):
response.headers.set("x-served", "robyn")
return response
身份验证
from robyn import Request
from robyn.authentication import AuthenticationHandler, BearerGetter, Identity
class Auth(AuthenticationHandler):
def authenticate(self, request: Request) -> Identity | None:
token = self.token_getter.get_token(request) # 读取 "Bearer <token>"
if token == "valid":
return Identity(claims={"user": "bruce"})
return None
app.configure_authentication(Auth(token_getter=BearerGetter()))
@app.get("/me", auth_required=True)
def me(request):
return request.identity.claims # 通过验证后即被填充
子路由
from robyn import SubRouter
api = SubRouter(prefix="/api/v1")
@api.get("/users")
def list_users():
return {"users": []}
app.include_router(api) # 路由挂载在 /api/v1/users
WebSocket
@app.websocket("/ws")
async def ws(websocket):
while True:
message = await websocket.receive_text()
await websocket.send_text(f"echo: {message}") # 也可:send_json(obj)
await websocket.broadcast("to everyone") # /ws 上的每个客户端
@ws.on_connect
def on_connect(websocket):
return "Welcome!"
@ws.on_close
def on_close(websocket):
return "Goodbye"
流式响应与服务器发送事件(SSE)
from robyn import StreamingResponse, SSEResponse, SSEMessage
@app.get("/sse")
def sse(request):
def events():
for i in range(3):
yield SSEMessage(f"tick {i}", event="tick", id=str(i))
return SSEResponse(events()) # text/event-stream
@app.get("/stream")
def stream(request):
def chunks():
yield b"chunk-1"
yield b"chunk-2"
return StreamingResponse(chunks(), media_type="application/octet-stream")
静态文件
from robyn import serve_file, serve_html
app.serve_directory(
route="/static",
directory_path="./assets",
index_file="index.html",
show_files_listing=False,
)
@app.get("/download")
def download():
return serve_file("./report.pdf", file_name="report.pdf") # 作为附件下载
@app.get("/page")
def page():
return serve_html("./templates/index.html")
模板(Jinja2)
from robyn.templating import JinjaTemplate
template = JinjaTemplate("./templates")
@app.get("/hello")
def hello():
return template.render_template(template_name="hello.html", name="Bruce")
抛出 HTTP 错误
from robyn.exceptions import HTTPException # 注意:robyn.exceptions,而非 robyn
@app.get("/users/:id")
def get_user(request):
if not request.path_params["id"].isdigit():
raise HTTPException(400, "id must be numeric") # (status_code, detail)
return {"id": request.path_params["id"]}
常量请求(计算一次,在 Rust 侧缓存)
@app.get("/health", const=True)
def health():
return {"status": "healthy"}
依赖注入
app.inject_global(DB="global-db") # 对所有路由可用
app.inject(CACHE="router-cache") # 对当前路由器的路由可用
@app.get("/data")
def data(request, global_dependencies, router_dependencies):
return {
"db": global_dependencies["DB"],
"cache": router_dependencies["CACHE"],
}
跨域资源共享(CORS)
from robyn import Robyn, ALLOW_CORS
app = Robyn(__file__)
ALLOW_CORS(app, origins=["http://localhost:3000"]) # 或 origins="*"
OpenAPI / Swagger
@app.get("/users", openapi_name="List Users", openapi_tags=["Users"])
def list_users():
return {"users": []}
# Swagger UI 在 /docs 提供,规范在 /openapi.json 提供
生命周期事件
@app.startup_handler
async def on_startup():
print("starting up")
@app.shutdown_handler
def on_shutdown():
print("shutting down")
多核扩展与命令行参数
python app.py --processes 4 --workers 2 # 跨核心 / 线程分布
python app.py --fast # 自动调优进程数、工作线程数和日志级别
python app.py --dev # 文件变更时自动重载(单进程)
python app.py --log-level WARNING # DEBUG / INFO / WARNING / ERROR
