速查表

一份常见 Robyn 任务的快速复制粘贴参考。所有代码片段都基于当前发行版。如需了解任一主题的完整内容,请参阅左侧导航栏中的链接。

安装并运行一个最小应用

app.py

# pip install robyn
from robyn import Robyn

app = Robyn(__file__)

@app.get("/")
def index():
    return "Hello, world"

app.start(host="0.0.0.0", port=8080)  # host 可选,默认为 127.0.0.1

各种 HTTP 方法的路由

@app.get("/items")
def list_items(): ...

@app.post("/items")
def create_item(): ...

@app.put("/items/:id")
def replace_item(): ...

@app.patch("/items/:id")
def update_item(): ...

@app.delete("/items/:id")
def delete_item(): ...

# 还提供:@app.head、@app.options、@app.connect、@app.trace

路径参数(单个、可选、通配)

@app.get("/users/:id")
def get_user(request):
    return {"id": request.path_params["id"]}

@app.get("/posts/:id/:slug?")            # 可选的末尾段
def get_post(request):
    return request.path_params.get("slug", "")

@app.get("/files/*path")                 # 通配:匹配路径的其余部分
def read_file(request):
    return {"path": request.path_params["path"]}   # 例如 "img/2024/logo.png"

查询参数

@app.get("/search")
def search(request):
    q = request.query_params.get("q", "")          # 最后一个值,带默认值
    tags = request.query_params.get_all("tags")    # 所有值 -> list[str]
    return {"q": q, "tags": tags}

读取请求数据

@app.post("/inspect")
def inspect(request):
    data = request.json()                  # 解析后的 JSON 请求体 -> dict / list
    raw = request.body                     # 原始请求体(str | bytes)
    form = request.form_data               # dict[str, str]
    files = request.files                  # dict[str, bytes]
    content_type = request.headers.get("Content-Type")
    return {
        "method": request.method,
        "path": request.url.path,
        "ip": request.ip_addr,
    }

返回响应

from robyn import Response, status_codes

@app.get("/text")
def text():
    return "plain text"

@app.get("/json")
def json_body():
    return {"key": "value"}            # dict / list 会被自动序列化为 JSON

@app.get("/custom")
def custom():
    return Response(
        status_code=status_codes.HTTP_201_CREATED,
        headers={"Content-Type": "text/plain"},
        body="created",               # 也接受 `description=`(旧版别名)
    )

重定向

from robyn import Response

@app.get("/old")
def old():
    return Response(status_code=307, headers={"Location": "/new"}, body="")
from robyn import Response, Headers

@app.get("/login")
def login():
    response = Response(status_code=200, headers=Headers({}), body="ok")
    response.set_cookie(
        key="session", value="abc123", max_age=3600,
        path="/", http_only=True, secure=True, same_site="Strict",
    )
    return response

中间件(请求前 / 请求后)

@app.before_request()                  # 全局;传入路径即为按路由生效
def add_trace(request):
    request.headers.set("x-trace", "1")
    return request

@app.after_request("/")                # 按路由;可接收 (request, response)
def stamp(request, response):
    response.headers.set("x-served", "robyn")
    return response

身份验证

from robyn import Request
from robyn.authentication import AuthenticationHandler, BearerGetter, Identity

class Auth(AuthenticationHandler):
    def authenticate(self, request: Request) -> Identity | None:
        token = self.token_getter.get_token(request)   # 读取 "Bearer <token>"
        if token == "valid":
            return Identity(claims={"user": "bruce"})
        return None

app.configure_authentication(Auth(token_getter=BearerGetter()))

@app.get("/me", auth_required=True)
def me(request):
    return request.identity.claims     # 通过验证后即被填充

子路由

from robyn import SubRouter

api = SubRouter(prefix="/api/v1")

@api.get("/users")
def list_users():
    return {"users": []}

app.include_router(api)                # 路由挂载在 /api/v1/users

WebSocket

@app.websocket("/ws")
async def ws(websocket):
    while True:
        message = await websocket.receive_text()
        await websocket.send_text(f"echo: {message}")   # 也可:send_json(obj)
        await websocket.broadcast("to everyone")         # /ws 上的每个客户端

@ws.on_connect
def on_connect(websocket):
    return "Welcome!"

@ws.on_close
def on_close(websocket):
    return "Goodbye"

流式响应与服务器发送事件(SSE)

from robyn import StreamingResponse, SSEResponse, SSEMessage

@app.get("/sse")
def sse(request):
    def events():
        for i in range(3):
            yield SSEMessage(f"tick {i}", event="tick", id=str(i))
    return SSEResponse(events())       # text/event-stream

@app.get("/stream")
def stream(request):
    def chunks():
        yield b"chunk-1"
        yield b"chunk-2"
    return StreamingResponse(chunks(), media_type="application/octet-stream")

静态文件

from robyn import serve_file, serve_html

app.serve_directory(
    route="/static",
    directory_path="./assets",
    index_file="index.html",
    show_files_listing=False,
)

@app.get("/download")
def download():
    return serve_file("./report.pdf", file_name="report.pdf")   # 作为附件下载

@app.get("/page")
def page():
    return serve_html("./templates/index.html")

模板(Jinja2)

from robyn.templating import JinjaTemplate

template = JinjaTemplate("./templates")

@app.get("/hello")
def hello():
    return template.render_template(template_name="hello.html", name="Bruce")

抛出 HTTP 错误

from robyn.exceptions import HTTPException     # 注意:robyn.exceptions,而非 robyn

@app.get("/users/:id")
def get_user(request):
    if not request.path_params["id"].isdigit():
        raise HTTPException(400, "id must be numeric")   # (status_code, detail)
    return {"id": request.path_params["id"]}

常量请求(计算一次,在 Rust 侧缓存)

@app.get("/health", const=True)
def health():
    return {"status": "healthy"}

依赖注入

app.inject_global(DB="global-db")      # 对所有路由可用
app.inject(CACHE="router-cache")       # 对当前路由器的路由可用

@app.get("/data")
def data(request, global_dependencies, router_dependencies):
    return {
        "db": global_dependencies["DB"],
        "cache": router_dependencies["CACHE"],
    }

跨域资源共享(CORS)

from robyn import Robyn, ALLOW_CORS

app = Robyn(__file__)
ALLOW_CORS(app, origins=["http://localhost:3000"])   # 或 origins="*"

OpenAPI / Swagger

@app.get("/users", openapi_name="List Users", openapi_tags=["Users"])
def list_users():
    return {"users": []}

# Swagger UI 在 /docs 提供,规范在 /openapi.json 提供

生命周期事件

@app.startup_handler
async def on_startup():
    print("starting up")

@app.shutdown_handler
def on_shutdown():
    print("shutting down")

多核扩展与命令行参数

python app.py --processes 4 --workers 2    # 跨核心 / 线程分布
python app.py --fast                       # 自动调优进程数、工作线程数和日志级别
python app.py --dev                        # 文件变更时自动重载(单进程)
python app.py --log-level WARNING          # DEBUG / INFO / WARNING / ERROR