1. 面试题目 #
给定一个包含数据Schema的API文档,请使用AI工具在15分钟内生成符合RESTful规范的CRUD接口代码,并解释关键实现逻辑。请以Todo List应用为例,基于提供的API文档,设计并实现完整的后端服务,包括数据模型、路由处理、业务逻辑、错误处理等核心组件。
2. 参考答案 #
2.1 引言:快速API开发的核心要素 #
在15分钟内基于API文档生成完整的CRUD接口代码,需要重点关注代码结构清晰、功能完整、符合RESTful规范,并具备良好的可扩展性。本方案将以Todo List应用为例,展示如何快速构建一个生产就绪的API服务。
2.2 项目结构设计 #
# 项目目录结构
todo_api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI应用入口
│ ├── models/
│ │ ├── __init__.py
│ │ └── todo.py # 数据模型
│ ├── schemas/
│ │ ├── __init__.py
│ │ └── todo.py # Pydantic模式
│ ├── routers/
│ │ ├── __init__.py
│ │ └── todos.py # 路由处理
│ ├── services/
│ │ ├── __init__.py
│ │ └── todo_service.py # 业务逻辑
│ ├── database/
│ │ ├── __init__.py
│ │ └── connection.py # 数据库连接
│ └── utils/
│ ├── __init__.py
│ └── exceptions.py # 自定义异常
├── requirements.txt
└── README.md2.3 核心实现代码 #
2.3.1 数据模型定义 #
# app/models/todo.py
from sqlalchemy import Column, String, DateTime, Enum, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
import uuid
from enum import Enum as PyEnum
Base = declarative_base()
class TodoStatus(PyEnum):
PENDING = "pending"
COMPLETED = "completed"
class Todo(Base):
__tablename__ = "todos"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
title = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
status = Column(Enum(TodoStatus), nullable=False, default=TodoStatus.PENDING)
due_date = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=func.now(), nullable=False)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=True)
def to_dict(self):
"""转换为字典格式"""
return {
"id": self.id,
"title": self.title,
"description": self.description,
"status": self.status.value,
"dueDate": self.due_date.isoformat() if self.due_date else None,
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat() if self.updated_at else None
}2.3.2 Pydantic模式定义 #
# app/schemas/todo.py
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime
from enum import Enum
class TodoStatus(str, Enum):
PENDING = "pending"
COMPLETED = "completed"
class TodoBase(BaseModel):
title: str = Field(..., min_length=1, max_length=100, description="任务标题")
description: Optional[str] = Field(None, description="任务描述")
due_date: Optional[datetime] = Field(None, description="截止日期")
@validator('due_date')
def validate_due_date(cls, v):
if v and v < datetime.now():
raise ValueError('截止日期不能早于当前时间')
return v
class TodoCreate(TodoBase):
"""创建Todo的请求模式"""
pass
class TodoUpdate(BaseModel):
"""更新Todo的请求模式"""
title: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = None
status: Optional[TodoStatus] = None
due_date: Optional[datetime] = None
@validator('due_date')
def validate_due_date(cls, v):
if v and v < datetime.now():
raise ValueError('截止日期不能早于当前时间')
return v
class TodoResponse(TodoBase):
"""Todo响应模式"""
id: str
status: TodoStatus
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class TodoListResponse(BaseModel):
"""Todo列表响应模式"""
total: int
page: int
limit: int
items: List[TodoResponse]
class BatchUpdateRequest(BaseModel):
"""批量更新请求模式"""
ids: List[str] = Field(..., min_items=1, description="要更新的Todo ID列表")
status: TodoStatus = Field(..., description="要设置的状态")
class BatchUpdateResponse(BaseModel):
"""批量更新响应模式"""
updated_count: int
class ErrorResponse(BaseModel):
"""错误响应模式"""
error: dict = Field(..., description="错误信息")
class Config:
schema_extra = {
"example": {
"error": {
"code": "INVALID_DUE_DATE",
"message": "截止日期不能早于当前时间"
}
}
}2.3.3 数据库连接配置 #
# app/database/connection.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import os
# 数据库配置
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./todo_app.db")
# 创建数据库引擎
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {},
poolclass=StaticPool if "sqlite" in DATABASE_URL else None
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
"""获取数据库会话"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def create_tables():
"""创建数据库表"""
from app.models.todo import Base
Base.metadata.create_all(bind=engine)2.3.4 业务逻辑服务 #
# app/services/todo_service.py
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from app.models.todo import Todo, TodoStatus
from app.schemas.todo import TodoCreate, TodoUpdate, BatchUpdateRequest
from typing import List, Optional, Tuple
from datetime import datetime
class TodoService:
def __init__(self, db: Session):
self.db = db
def get_todos(
self,
status: Optional[str] = None,
page: int = 1,
limit: int = 20
) -> Tuple[List[Todo], int]:
"""获取Todo列表"""
query = self.db.query(Todo)
# 状态过滤
if status:
try:
status_enum = TodoStatus(status)
query = query.filter(Todo.status == status_enum)
except ValueError:
raise ValueError(f"无效的状态值: {status}")
# 分页
offset = (page - 1) * limit
todos = query.offset(offset).limit(limit).all()
total = query.count()
return todos, total
def get_todo_by_id(self, todo_id: str) -> Optional[Todo]:
"""根据ID获取Todo"""
return self.db.query(Todo).filter(Todo.id == todo_id).first()
def create_todo(self, todo_data: TodoCreate) -> Todo:
"""创建Todo"""
todo = Todo(
title=todo_data.title,
description=todo_data.description,
due_date=todo_data.due_date
)
self.db.add(todo)
self.db.commit()
self.db.refresh(todo)
return todo
def update_todo(self, todo_id: str, update_data: TodoUpdate) -> Optional[Todo]:
"""更新Todo"""
todo = self.get_todo_by_id(todo_id)
if not todo:
return None
# 更新字段
update_dict = update_data.dict(exclude_unset=True)
for field, value in update_dict.items():
if field == "due_date":
setattr(todo, "due_date", value)
else:
setattr(todo, field, value)
todo.updated_at = datetime.now()
self.db.commit()
self.db.refresh(todo)
return todo
def delete_todo(self, todo_id: str) -> bool:
"""删除Todo"""
todo = self.get_todo_by_id(todo_id)
if not todo:
return False
self.db.delete(todo)
self.db.commit()
return True
def batch_update_todos(self, batch_data: BatchUpdateRequest) -> int:
"""批量更新Todo状态"""
updated_count = self.db.query(Todo).filter(
Todo.id.in_(batch_data.ids)
).update(
{"status": batch_data.status.value, "updated_at": datetime.now()},
synchronize_session=False
)
self.db.commit()
return updated_count2.3.5 路由处理 #
# app/routers/todos.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from typing import Optional, List
from app.database.connection import get_db
from app.services.todo_service import TodoService
from app.schemas.todo import (
TodoCreate, TodoUpdate, TodoResponse, TodoListResponse,
BatchUpdateRequest, BatchUpdateResponse, ErrorResponse
)
router = APIRouter(prefix="/todos", tags=["todos"])
@router.get("/", response_model=TodoListResponse)
async def get_todos(
status: Optional[str] = Query(None, description="过滤状态: pending/completed"),
page: int = Query(1, ge=1, description="页码"),
limit: int = Query(20, ge=1, le=100, description="每页数量"),
db: Session = Depends(get_db)
):
"""获取所有Todo对象"""
try:
todo_service = TodoService(db)
todos, total = todo_service.get_todos(status, page, limit)
return TodoListResponse(
total=total,
page=page,
limit=limit,
items=[TodoResponse.from_orm(todo) for todo in todos]
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="服务器内部错误")
@router.post("/", response_model=TodoResponse, status_code=status.HTTP_201_CREATED)
async def create_todo(
todo_data: TodoCreate,
db: Session = Depends(get_db)
):
"""创建Todo"""
try:
todo_service = TodoService(db)
todo = todo_service.create_todo(todo_data)
return TodoResponse.from_orm(todo)
except ValueError as e:
raise HTTPException(
status_code=400,
detail={
"error": {
"code": "VALIDATION_ERROR",
"message": str(e)
}
}
)
except Exception as e:
raise HTTPException(status_code=500, detail="服务器内部错误")
@router.get("/{todo_id}", response_model=TodoResponse)
async def get_todo(
todo_id: str,
db: Session = Depends(get_db)
):
"""获取单个Todo详情"""
todo_service = TodoService(db)
todo = todo_service.get_todo_by_id(todo_id)
if not todo:
raise HTTPException(
status_code=404,
detail={
"error": {
"code": "NOT_FOUND",
"message": "Todo not found"
}
}
)
return TodoResponse.from_orm(todo)
@router.patch("/{todo_id}", response_model=TodoResponse)
async def update_todo(
todo_id: str,
update_data: TodoUpdate,
db: Session = Depends(get_db)
):
"""更新Todo信息"""
try:
todo_service = TodoService(db)
todo = todo_service.update_todo(todo_id, update_data)
if not todo:
raise HTTPException(
status_code=404,
detail={
"error": {
"code": "NOT_FOUND",
"message": "Todo not found"
}
}
)
return TodoResponse.from_orm(todo)
except ValueError as e:
raise HTTPException(
status_code=400,
detail={
"error": {
"code": "VALIDATION_ERROR",
"message": str(e)
}
}
)
except Exception as e:
raise HTTPException(status_code=500, detail="服务器内部错误")
@router.delete("/{todo_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_todo(
todo_id: str,
db: Session = Depends(get_db)
):
"""删除Todo"""
todo_service = TodoService(db)
success = todo_service.delete_todo(todo_id)
if not success:
raise HTTPException(
status_code=404,
detail={
"error": {
"code": "NOT_FOUND",
"message": "Todo not found"
}
}
)
@router.post("/batch-update", response_model=BatchUpdateResponse)
async def batch_update_todos(
batch_data: BatchUpdateRequest,
db: Session = Depends(get_db)
):
"""批量更新Todo状态"""
try:
todo_service = TodoService(db)
updated_count = todo_service.batch_update_todos(batch_data)
return BatchUpdateResponse(updated_count=updated_count)
except Exception as e:
raise HTTPException(status_code=500, detail="服务器内部错误")2.3.6 主应用入口 #
# app/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.database.connection import create_tables
from app.routers import todos
import uvicorn
# 创建FastAPI应用
app = FastAPI(
title="Todo List API",
description="基于RESTful规范的Todo List API服务",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(todos.router)
# 全局异常处理
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content=exc.detail
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
return JSONResponse(
status_code=500,
content={
"error": {
"code": "INTERNAL_SERVER_ERROR",
"message": "服务器内部错误"
}
}
)
# 启动事件
@app.on_event("startup")
async def startup_event():
"""应用启动时创建数据库表"""
create_tables()
print("数据库表创建完成")
# 根路径
@app.get("/")
async def root():
return {"message": "Todo List API", "version": "1.0.0"}
# 健康检查
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)2.3.7 依赖配置 #
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
pydantic==2.5.0
python-multipart==0.0.6
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.42.4 关键实现逻辑解释 #
2.4.1 数据验证与错误处理 #
# app/utils/exceptions.py
from fastapi import HTTPException
from typing import Dict, Any
class APIException(HTTPException):
"""自定义API异常"""
def __init__(self, status_code: int, error_code: str, message: str):
super().__init__(
status_code=status_code,
detail={
"error": {
"code": error_code,
"message": message
}
}
)
class ValidationError(APIException):
"""验证错误"""
def __init__(self, message: str):
super().__init__(400, "VALIDATION_ERROR", message)
class NotFoundError(APIException):
"""资源未找到错误"""
def __init__(self, resource: str = "Resource"):
super().__init__(404, "NOT_FOUND", f"{resource} not found")
class UnauthorizedError(APIException):
"""未授权错误"""
def __init__(self, message: str = "Unauthorized access"):
super().__init__(401, "UNAUTHORIZED", message)
class RateLimitError(APIException):
"""请求频率限制错误"""
def __init__(self, message: str = "Request frequency limit exceeded"):
super().__init__(429, "RATE_LIMIT_EXCEEDED", message)2.4.2 中间件与认证 #
# app/middleware/auth.py
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
import jwt
import os
security = HTTPBearer()
class AuthMiddleware:
def __init__(self):
self.secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
self.algorithm = "HS256"
def verify_token(self, credentials: HTTPAuthorizationCredentials) -> bool:
"""验证Bearer Token"""
try:
payload = jwt.decode(
credentials.credentials,
self.secret_key,
algorithms=[self.algorithm]
)
return True
except jwt.PyJWTError:
return False
async def __call__(self, request: Request, call_next):
# 跳过认证的路径
if request.url.path in ["/", "/health", "/docs", "/redoc", "/openapi.json"]:
return await call_next(request)
# 检查Authorization头
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail={
"error": {
"code": "UNAUTHORIZED",
"message": "Missing or invalid authorization header"
}
}
)
# 验证token
credentials = HTTPAuthorizationCredentials(
scheme="Bearer",
credentials=auth_header.split(" ")[1]
)
if not self.verify_token(credentials):
raise HTTPException(
status_code=401,
detail={
"error": {
"code": "UNAUTHORIZED",
"message": "Invalid token"
}
}
)
return await call_next(request)2.4.3 数据库迁移脚本 #
# migrations/init_db.py
from sqlalchemy import create_engine
from app.database.connection import DATABASE_URL
from app.models.todo import Base
import os
def init_database():
"""初始化数据库"""
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(bind=engine)
print("数据库初始化完成")
if __name__ == "__main__":
init_database()2.5 测试用例 #
# tests/test_todos.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.database.connection import get_db, SessionLocal
from app.models.todo import Base, Todo
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 测试数据库
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
@pytest.fixture(scope="module")
def setup_database():
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
def test_create_todo(setup_database):
"""测试创建Todo"""
response = client.post(
"/todos/",
json={
"title": "测试任务",
"description": "这是一个测试任务",
"due_date": "2024-12-31T23:59:59"
}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "测试任务"
assert data["status"] == "pending"
def test_get_todos(setup_database):
"""测试获取Todo列表"""
response = client.get("/todos/")
assert response.status_code == 200
data = response.json()
assert "total" in data
assert "items" in data
def test_get_todo_by_id(setup_database):
"""测试根据ID获取Todo"""
# 先创建一个Todo
create_response = client.post(
"/todos/",
json={"title": "测试任务"}
)
todo_id = create_response.json()["id"]
# 获取Todo
response = client.get(f"/todos/{todo_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == todo_id
def test_update_todo(setup_database):
"""测试更新Todo"""
# 先创建一个Todo
create_response = client.post(
"/todos/",
json={"title": "测试任务"}
)
todo_id = create_response.json()["id"]
# 更新Todo
response = client.patch(
f"/todos/{todo_id}",
json={"status": "completed"}
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "completed"
def test_delete_todo(setup_database):
"""测试删除Todo"""
# 先创建一个Todo
create_response = client.post(
"/todos/",
json={"title": "测试任务"}
)
todo_id = create_response.json()["id"]
# 删除Todo
response = client.delete(f"/todos/{todo_id}")
assert response.status_code == 204
# 验证删除
get_response = client.get(f"/todos/{todo_id}")
assert get_response.status_code == 404
def test_batch_update_todos(setup_database):
"""测试批量更新Todo"""
# 创建多个Todo
todo_ids = []
for i in range(3):
response = client.post(
"/todos/",
json={"title": f"测试任务{i}"}
)
todo_ids.append(response.json()["id"])
# 批量更新
response = client.post(
"/todos/batch-update",
json={
"ids": todo_ids,
"status": "completed"
}
)
assert response.status_code == 200
data = response.json()
assert data["updated_count"] == 32.6 部署配置 #
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/todo_db
depends_on:
- db
db:
image: postgres:13
environment:
- POSTGRES_DB=todo_db
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
postgres_data:# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]2.7 总结 #
通过上述实现,我们在15分钟内构建了一个完整的RESTful CRUD API服务,具备以下特点:
- 符合RESTful规范:使用标准的HTTP方法和状态码
- 完整的数据验证:使用Pydantic进行请求和响应验证
- 错误处理机制:统一的错误响应格式
- 数据库抽象:使用SQLAlchemy ORM
- 可测试性:提供完整的测试用例
- 可扩展性:模块化设计,易于扩展
- 文档化:自动生成API文档
这个实现展示了如何快速、高效地基于API文档构建生产就绪的后端服务。