ai
  • outline
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 1. 面试题目
  • 2. 参考答案
    • 2.1 引言:快速API开发的核心要素
    • 2.2 项目结构设计
    • 2.3 核心实现代码
      • 2.3.1 数据模型定义
      • 2.3.2 Pydantic模式定义
      • 2.3.3 数据库连接配置
      • 2.3.4 业务逻辑服务
      • 2.3.5 路由处理
      • 2.3.6 主应用入口
      • 2.3.7 依赖配置
    • 2.4 关键实现逻辑解释
      • 2.4.1 数据验证与错误处理
      • 2.4.2 中间件与认证
      • 2.4.3 数据库迁移脚本
    • 2.5 测试用例
    • 2.6 部署配置
    • 2.7 总结

1. 面试题目 #

给定一个包含数据Schema的API文档,请使用AI工具在15分钟内生成符合RESTful规范的CRUD接口代码,并解释关键实现逻辑。请以Todo List应用为例,基于提供的API文档,设计并实现完整的后端服务,包括数据模型、路由处理、业务逻辑、错误处理等核心组件。

2. 参考答案 #

2.1 引言:快速API开发的核心要素 #

在15分钟内基于API文档生成完整的CRUD接口代码,需要重点关注代码结构清晰、功能完整、符合RESTful规范,并具备良好的可扩展性。本方案将以Todo List应用为例,展示如何快速构建一个生产就绪的API服务。

2.2 项目结构设计 #

# 项目目录结构
todo_api/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI应用入口
│   ├── models/
│   │   ├── __init__.py
│   │   └── todo.py          # 数据模型
│   ├── schemas/
│   │   ├── __init__.py
│   │   └── todo.py          # Pydantic模式
│   ├── routers/
│   │   ├── __init__.py
│   │   └── todos.py         # 路由处理
│   ├── services/
│   │   ├── __init__.py
│   │   └── todo_service.py  # 业务逻辑
│   ├── database/
│   │   ├── __init__.py
│   │   └── connection.py    # 数据库连接
│   └── utils/
│       ├── __init__.py
│       └── exceptions.py    # 自定义异常
├── requirements.txt
└── README.md

2.3 核心实现代码 #

2.3.1 数据模型定义 #

# app/models/todo.py
from sqlalchemy import Column, String, DateTime, Enum, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
import uuid
from enum import Enum as PyEnum

Base = declarative_base()

class TodoStatus(PyEnum):
    PENDING = "pending"
    COMPLETED = "completed"

class Todo(Base):
    __tablename__ = "todos"

    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    title = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    status = Column(Enum(TodoStatus), nullable=False, default=TodoStatus.PENDING)
    due_date = Column(DateTime, nullable=True)
    created_at = Column(DateTime, default=func.now(), nullable=False)
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=True)

    def to_dict(self):
        """转换为字典格式"""
        return {
            "id": self.id,
            "title": self.title,
            "description": self.description,
            "status": self.status.value,
            "dueDate": self.due_date.isoformat() if self.due_date else None,
            "createdAt": self.created_at.isoformat(),
            "updatedAt": self.updated_at.isoformat() if self.updated_at else None
        }

2.3.2 Pydantic模式定义 #

# app/schemas/todo.py
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from datetime import datetime
from enum import Enum

class TodoStatus(str, Enum):
    PENDING = "pending"
    COMPLETED = "completed"

class TodoBase(BaseModel):
    title: str = Field(..., min_length=1, max_length=100, description="任务标题")
    description: Optional[str] = Field(None, description="任务描述")
    due_date: Optional[datetime] = Field(None, description="截止日期")

    @validator('due_date')
    def validate_due_date(cls, v):
        if v and v < datetime.now():
            raise ValueError('截止日期不能早于当前时间')
        return v

class TodoCreate(TodoBase):
    """创建Todo的请求模式"""
    pass

class TodoUpdate(BaseModel):
    """更新Todo的请求模式"""
    title: Optional[str] = Field(None, min_length=1, max_length=100)
    description: Optional[str] = None
    status: Optional[TodoStatus] = None
    due_date: Optional[datetime] = None

    @validator('due_date')
    def validate_due_date(cls, v):
        if v and v < datetime.now():
            raise ValueError('截止日期不能早于当前时间')
        return v

class TodoResponse(TodoBase):
    """Todo响应模式"""
    id: str
    status: TodoStatus
    created_at: datetime
    updated_at: Optional[datetime] = None

    class Config:
        from_attributes = True

class TodoListResponse(BaseModel):
    """Todo列表响应模式"""
    total: int
    page: int
    limit: int
    items: List[TodoResponse]

class BatchUpdateRequest(BaseModel):
    """批量更新请求模式"""
    ids: List[str] = Field(..., min_items=1, description="要更新的Todo ID列表")
    status: TodoStatus = Field(..., description="要设置的状态")

class BatchUpdateResponse(BaseModel):
    """批量更新响应模式"""
    updated_count: int

class ErrorResponse(BaseModel):
    """错误响应模式"""
    error: dict = Field(..., description="错误信息")

    class Config:
        schema_extra = {
            "example": {
                "error": {
                    "code": "INVALID_DUE_DATE",
                    "message": "截止日期不能早于当前时间"
                }
            }
        }

2.3.3 数据库连接配置 #

# app/database/connection.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
import os

# 数据库配置
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./todo_app.db")

# 创建数据库引擎
engine = create_engine(
    DATABASE_URL,
    connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {},
    poolclass=StaticPool if "sqlite" in DATABASE_URL else None
)

# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db():
    """获取数据库会话"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def create_tables():
    """创建数据库表"""
    from app.models.todo import Base
    Base.metadata.create_all(bind=engine)

2.3.4 业务逻辑服务 #

# app/services/todo_service.py
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from app.models.todo import Todo, TodoStatus
from app.schemas.todo import TodoCreate, TodoUpdate, BatchUpdateRequest
from typing import List, Optional, Tuple
from datetime import datetime

class TodoService:
    def __init__(self, db: Session):
        self.db = db

    def get_todos(
        self, 
        status: Optional[str] = None, 
        page: int = 1, 
        limit: int = 20
    ) -> Tuple[List[Todo], int]:
        """获取Todo列表"""
        query = self.db.query(Todo)

        # 状态过滤
        if status:
            try:
                status_enum = TodoStatus(status)
                query = query.filter(Todo.status == status_enum)
            except ValueError:
                raise ValueError(f"无效的状态值: {status}")

        # 分页
        offset = (page - 1) * limit
        todos = query.offset(offset).limit(limit).all()
        total = query.count()

        return todos, total

    def get_todo_by_id(self, todo_id: str) -> Optional[Todo]:
        """根据ID获取Todo"""
        return self.db.query(Todo).filter(Todo.id == todo_id).first()

    def create_todo(self, todo_data: TodoCreate) -> Todo:
        """创建Todo"""
        todo = Todo(
            title=todo_data.title,
            description=todo_data.description,
            due_date=todo_data.due_date
        )

        self.db.add(todo)
        self.db.commit()
        self.db.refresh(todo)

        return todo

    def update_todo(self, todo_id: str, update_data: TodoUpdate) -> Optional[Todo]:
        """更新Todo"""
        todo = self.get_todo_by_id(todo_id)
        if not todo:
            return None

        # 更新字段
        update_dict = update_data.dict(exclude_unset=True)
        for field, value in update_dict.items():
            if field == "due_date":
                setattr(todo, "due_date", value)
            else:
                setattr(todo, field, value)

        todo.updated_at = datetime.now()

        self.db.commit()
        self.db.refresh(todo)

        return todo

    def delete_todo(self, todo_id: str) -> bool:
        """删除Todo"""
        todo = self.get_todo_by_id(todo_id)
        if not todo:
            return False

        self.db.delete(todo)
        self.db.commit()

        return True

    def batch_update_todos(self, batch_data: BatchUpdateRequest) -> int:
        """批量更新Todo状态"""
        updated_count = self.db.query(Todo).filter(
            Todo.id.in_(batch_data.ids)
        ).update(
            {"status": batch_data.status.value, "updated_at": datetime.now()},
            synchronize_session=False
        )

        self.db.commit()
        return updated_count

2.3.5 路由处理 #

# app/routers/todos.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from typing import Optional, List
from app.database.connection import get_db
from app.services.todo_service import TodoService
from app.schemas.todo import (
    TodoCreate, TodoUpdate, TodoResponse, TodoListResponse,
    BatchUpdateRequest, BatchUpdateResponse, ErrorResponse
)

router = APIRouter(prefix="/todos", tags=["todos"])

@router.get("/", response_model=TodoListResponse)
async def get_todos(
    status: Optional[str] = Query(None, description="过滤状态: pending/completed"),
    page: int = Query(1, ge=1, description="页码"),
    limit: int = Query(20, ge=1, le=100, description="每页数量"),
    db: Session = Depends(get_db)
):
    """获取所有Todo对象"""
    try:
        todo_service = TodoService(db)
        todos, total = todo_service.get_todos(status, page, limit)

        return TodoListResponse(
            total=total,
            page=page,
            limit=limit,
            items=[TodoResponse.from_orm(todo) for todo in todos]
        )
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail="服务器内部错误")

@router.post("/", response_model=TodoResponse, status_code=status.HTTP_201_CREATED)
async def create_todo(
    todo_data: TodoCreate,
    db: Session = Depends(get_db)
):
    """创建Todo"""
    try:
        todo_service = TodoService(db)
        todo = todo_service.create_todo(todo_data)
        return TodoResponse.from_orm(todo)
    except ValueError as e:
        raise HTTPException(
            status_code=400, 
            detail={
                "error": {
                    "code": "VALIDATION_ERROR",
                    "message": str(e)
                }
            }
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail="服务器内部错误")

@router.get("/{todo_id}", response_model=TodoResponse)
async def get_todo(
    todo_id: str,
    db: Session = Depends(get_db)
):
    """获取单个Todo详情"""
    todo_service = TodoService(db)
    todo = todo_service.get_todo_by_id(todo_id)

    if not todo:
        raise HTTPException(
            status_code=404,
            detail={
                "error": {
                    "code": "NOT_FOUND",
                    "message": "Todo not found"
                }
            }
        )

    return TodoResponse.from_orm(todo)

@router.patch("/{todo_id}", response_model=TodoResponse)
async def update_todo(
    todo_id: str,
    update_data: TodoUpdate,
    db: Session = Depends(get_db)
):
    """更新Todo信息"""
    try:
        todo_service = TodoService(db)
        todo = todo_service.update_todo(todo_id, update_data)

        if not todo:
            raise HTTPException(
                status_code=404,
                detail={
                    "error": {
                        "code": "NOT_FOUND",
                        "message": "Todo not found"
                    }
                }
            )

        return TodoResponse.from_orm(todo)
    except ValueError as e:
        raise HTTPException(
            status_code=400,
            detail={
                "error": {
                    "code": "VALIDATION_ERROR",
                    "message": str(e)
                }
            }
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail="服务器内部错误")

@router.delete("/{todo_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_todo(
    todo_id: str,
    db: Session = Depends(get_db)
):
    """删除Todo"""
    todo_service = TodoService(db)
    success = todo_service.delete_todo(todo_id)

    if not success:
        raise HTTPException(
            status_code=404,
            detail={
                "error": {
                    "code": "NOT_FOUND",
                    "message": "Todo not found"
                }
            }
        )

@router.post("/batch-update", response_model=BatchUpdateResponse)
async def batch_update_todos(
    batch_data: BatchUpdateRequest,
    db: Session = Depends(get_db)
):
    """批量更新Todo状态"""
    try:
        todo_service = TodoService(db)
        updated_count = todo_service.batch_update_todos(batch_data)

        return BatchUpdateResponse(updated_count=updated_count)
    except Exception as e:
        raise HTTPException(status_code=500, detail="服务器内部错误")

2.3.6 主应用入口 #

# app/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.database.connection import create_tables
from app.routers import todos
import uvicorn

# 创建FastAPI应用
app = FastAPI(
    title="Todo List API",
    description="基于RESTful规范的Todo List API服务",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc"
)

# 添加CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(todos.router)

# 全局异常处理
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
    return JSONResponse(
        status_code=exc.status_code,
        content=exc.detail
    )

@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
    return JSONResponse(
        status_code=500,
        content={
            "error": {
                "code": "INTERNAL_SERVER_ERROR",
                "message": "服务器内部错误"
            }
        }
    )

# 启动事件
@app.on_event("startup")
async def startup_event():
    """应用启动时创建数据库表"""
    create_tables()
    print("数据库表创建完成")

# 根路径
@app.get("/")
async def root():
    return {"message": "Todo List API", "version": "1.0.0"}

# 健康检查
@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

2.3.7 依赖配置 #

# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
sqlalchemy==2.0.23
pydantic==2.5.0
python-multipart==0.0.6
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4

2.4 关键实现逻辑解释 #

2.4.1 数据验证与错误处理 #

# app/utils/exceptions.py
from fastapi import HTTPException
from typing import Dict, Any

class APIException(HTTPException):
    """自定义API异常"""
    def __init__(self, status_code: int, error_code: str, message: str):
        super().__init__(
            status_code=status_code,
            detail={
                "error": {
                    "code": error_code,
                    "message": message
                }
            }
        )

class ValidationError(APIException):
    """验证错误"""
    def __init__(self, message: str):
        super().__init__(400, "VALIDATION_ERROR", message)

class NotFoundError(APIException):
    """资源未找到错误"""
    def __init__(self, resource: str = "Resource"):
        super().__init__(404, "NOT_FOUND", f"{resource} not found")

class UnauthorizedError(APIException):
    """未授权错误"""
    def __init__(self, message: str = "Unauthorized access"):
        super().__init__(401, "UNAUTHORIZED", message)

class RateLimitError(APIException):
    """请求频率限制错误"""
    def __init__(self, message: str = "Request frequency limit exceeded"):
        super().__init__(429, "RATE_LIMIT_EXCEEDED", message)

2.4.2 中间件与认证 #

# app/middleware/auth.py
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
import jwt
import os

security = HTTPBearer()

class AuthMiddleware:
    def __init__(self):
        self.secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key")
        self.algorithm = "HS256"

    def verify_token(self, credentials: HTTPAuthorizationCredentials) -> bool:
        """验证Bearer Token"""
        try:
            payload = jwt.decode(
                credentials.credentials, 
                self.secret_key, 
                algorithms=[self.algorithm]
            )
            return True
        except jwt.PyJWTError:
            return False

    async def __call__(self, request: Request, call_next):
        # 跳过认证的路径
        if request.url.path in ["/", "/health", "/docs", "/redoc", "/openapi.json"]:
            return await call_next(request)

        # 检查Authorization头
        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            raise HTTPException(
                status_code=401,
                detail={
                    "error": {
                        "code": "UNAUTHORIZED",
                        "message": "Missing or invalid authorization header"
                    }
                }
            )

        # 验证token
        credentials = HTTPAuthorizationCredentials(
            scheme="Bearer",
            credentials=auth_header.split(" ")[1]
        )

        if not self.verify_token(credentials):
            raise HTTPException(
                status_code=401,
                detail={
                    "error": {
                        "code": "UNAUTHORIZED",
                        "message": "Invalid token"
                    }
                }
            )

        return await call_next(request)

2.4.3 数据库迁移脚本 #

# migrations/init_db.py
from sqlalchemy import create_engine
from app.database.connection import DATABASE_URL
from app.models.todo import Base
import os

def init_database():
    """初始化数据库"""
    engine = create_engine(DATABASE_URL)
    Base.metadata.create_all(bind=engine)
    print("数据库初始化完成")

if __name__ == "__main__":
    init_database()

2.5 测试用例 #

# tests/test_todos.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.database.connection import get_db, SessionLocal
from app.models.todo import Base, Todo
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 测试数据库
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def override_get_db():
    try:
        db = TestingSessionLocal()
        yield db
    finally:
        db.close()

app.dependency_overrides[get_db] = override_get_db

client = TestClient(app)

@pytest.fixture(scope="module")
def setup_database():
    Base.metadata.create_all(bind=engine)
    yield
    Base.metadata.drop_all(bind=engine)

def test_create_todo(setup_database):
    """测试创建Todo"""
    response = client.post(
        "/todos/",
        json={
            "title": "测试任务",
            "description": "这是一个测试任务",
            "due_date": "2024-12-31T23:59:59"
        }
    )
    assert response.status_code == 201
    data = response.json()
    assert data["title"] == "测试任务"
    assert data["status"] == "pending"

def test_get_todos(setup_database):
    """测试获取Todo列表"""
    response = client.get("/todos/")
    assert response.status_code == 200
    data = response.json()
    assert "total" in data
    assert "items" in data

def test_get_todo_by_id(setup_database):
    """测试根据ID获取Todo"""
    # 先创建一个Todo
    create_response = client.post(
        "/todos/",
        json={"title": "测试任务"}
    )
    todo_id = create_response.json()["id"]

    # 获取Todo
    response = client.get(f"/todos/{todo_id}")
    assert response.status_code == 200
    data = response.json()
    assert data["id"] == todo_id

def test_update_todo(setup_database):
    """测试更新Todo"""
    # 先创建一个Todo
    create_response = client.post(
        "/todos/",
        json={"title": "测试任务"}
    )
    todo_id = create_response.json()["id"]

    # 更新Todo
    response = client.patch(
        f"/todos/{todo_id}",
        json={"status": "completed"}
    )
    assert response.status_code == 200
    data = response.json()
    assert data["status"] == "completed"

def test_delete_todo(setup_database):
    """测试删除Todo"""
    # 先创建一个Todo
    create_response = client.post(
        "/todos/",
        json={"title": "测试任务"}
    )
    todo_id = create_response.json()["id"]

    # 删除Todo
    response = client.delete(f"/todos/{todo_id}")
    assert response.status_code == 204

    # 验证删除
    get_response = client.get(f"/todos/{todo_id}")
    assert get_response.status_code == 404

def test_batch_update_todos(setup_database):
    """测试批量更新Todo"""
    # 创建多个Todo
    todo_ids = []
    for i in range(3):
        response = client.post(
            "/todos/",
            json={"title": f"测试任务{i}"}
        )
        todo_ids.append(response.json()["id"])

    # 批量更新
    response = client.post(
        "/todos/batch-update",
        json={
            "ids": todo_ids,
            "status": "completed"
        }
    )
    assert response.status_code == 200
    data = response.json()
    assert data["updated_count"] == 3

2.6 部署配置 #

# docker-compose.yml
version: '3.8'
services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/todo_db
    depends_on:
      - db

  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=todo_db
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

volumes:
  postgres_data:
# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

2.7 总结 #

通过上述实现,我们在15分钟内构建了一个完整的RESTful CRUD API服务,具备以下特点:

  1. 符合RESTful规范:使用标准的HTTP方法和状态码
  2. 完整的数据验证:使用Pydantic进行请求和响应验证
  3. 错误处理机制:统一的错误响应格式
  4. 数据库抽象:使用SQLAlchemy ORM
  5. 可测试性:提供完整的测试用例
  6. 可扩展性:模块化设计,易于扩展
  7. 文档化:自动生成API文档

这个实现展示了如何快速、高效地基于API文档构建生产就绪的后端服务。

访问验证

请输入访问令牌

Token不正确,请重新输入