数据模型和数据库连接

蝙蝠侠设计了一个数据模型,用于表示犯罪数据,包括罪行类型、嫌疑人及其位置等信息。他选择使用 SQLite 数据库来存储数据,并通过 ORM(对象关系映射)库与数据库进行交互。

# models.py
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float
from sqlalchemy.orm import declarative_base, sessionmaker

DATABASE_URL = "sqlite:///./gotham_crime_data.db"

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


Base = declarative_base()

class Crime(Base):
    __tablename__ = "crimes"

    id = Column(Integer, primary_key=True, index=True)
    type = Column(String, index=True)
    description = Column(String)
    location = Column(String)
    suspect_name = Column(String)
    date_time = Column(DateTime)
    latitude = Column(Float)
    longitude = Column(Float)

配置 Robyn 应用

蝙蝠侠成功创建了一个 Robyn 应用,并将其配置为使用数据库会话来访问 SQLite 数据库。

根据数据库模型,蝙蝠侠实现了一些辅助方法来与数据库进行交互。这些方法将用于在应用的 API 接口中执行 CRUD(增删改查)操作。

crud.py

# crud.py
from sqlalchemy.orm import Session
from .models import  Crime


def get_crime(db: Session, crime_id: int):
    return db.query(Crime).filter(Crime.id == crime_id).first()

def get_crimes(db: Session, skip: int = 0, limit: int = 100):
    return db.query(Crime).offset(skip).limit(limit).all()

def create_crime(db: Session, crime):
    db_crime = Crime(**crime)
    db.add(db_crime)
    db.commit()
    db.refresh(db_crime)
    return db_crime

def update_crime(db: Session, crime_id: int, crime):
    db_crime = get_crime(db, crime_id)
    if db_crime is None:
        return None
    for key, value in crime.items():
        setattr(db_crime, key, value)
    db.commit()
    db.refresh(db_crime)
    return db_crime

def delete_crime(db: Session, crime_id: int):
    db_crime = get_crime(db, crime_id)
    if db_crime is None:
        return False
    db.delete(db_crime)
    db.commit()
    return True

犯罪数据 API 接口

蝙蝠侠为管理犯罪数据创建了多个 API 接口。这些接口允许哥谭市警察局添加、更新和查询犯罪记录。

关于 defasync def 上面的 CRUD 辅助函数使用的是同步 SQLAlchemy,因此下面的处理器特意写成普通的 def 函数。Robyn 会在工作线程中运行同步处理器,这样其中的阻塞式数据库调用就不会阻塞事件循环。如果你更喜欢 async def 处理器,请搭配异步数据库驱动(例如 SQLAlchemy 的 asyncio 扩展,使用 create_async_engineawait session.execute(...)),让数据库 I/O 真正实现非阻塞。唯一要避免的组合,是在 async def 处理器中执行阻塞式的同步数据库调用——那会让事件循环为其他所有请求停滞。

Setting up Routes

# __main__.py
from robyn import Robyn
from robyn.robyn import Request, Response
from sqlalchemy.orm import Session

app = Robyn(__file__)

@app.post("/crimes")
def add_crime(request):
    with SessionLocal() as db:
        crime = request.json()
        insertion = crud.create_crime(db, crime)

    if insertion is None:
        raise Exception("Crime not added")

    return {
        "description": "Crime added successfully",
        "status_code": 200,
    }

@app.get("/crimes")
def get_crimes(request):
    with SessionLocal() as db:
        skip = int(request.query_params.get("skip", "0"))
        limit = int(request.query_params.get("limit", "100"))
        crimes = crud.get_crimes(db, skip=skip, limit=limit)

    return crimes

@app.get("/crimes/:crime_id", auth_required=True)
def get_crime(request):
    crime_id = int(request.path_params.get("crime_id"))
    with SessionLocal() as db:
        crime = crud.get_crime(db, crime_id=crime_id)

    if crime is None:
        raise Exception("Crime not found")

    return crime

@app.put("/crimes/:crime_id")
def update_crime(request):
    crime = request.json()
    crime_id = int(request.path_params.get("crime_id"))
    with SessionLocal() as db:
        updated_crime = crud.update_crime(db, crime_id=crime_id, crime=crime)
    if updated_crime is None:
        raise Exception("Crime not found")
    return updated_crime

@app.delete("/crimes/:crime_id")
def delete_crime(request):
    crime_id = int(request.path_params.get("crime_id"))
    with SessionLocal() as db:
        success = crud.delete_crime(db, crime_id=crime_id)
    if not success:
        raise Exception("Crime not found")
    return {"message": "Crime deleted successfully"}