简介:本文详细讲解如何使用FastAPI框架与PostgreSQL数据库构建高性能API,涵盖环境配置、数据库建模、CRUD操作、API路由设计及异步处理等核心内容,提供完整可运行的代码示例。
在构建现代Web API时,技术栈的选择直接影响开发效率与系统性能。FastAPI作为基于Python的新型框架,凭借其ASGI特性、自动生成OpenAPI文档和类型注解支持,在微服务架构中展现出显著优势。PostgreSQL作为开源关系型数据库的标杆,提供JSONB、全文检索等高级功能,特别适合需要复杂查询的场景。
架构设计上采用三层结构:
这种分层设计遵循单一职责原则,便于后期维护与扩展。异步编程模型通过async/await实现非阻塞I/O操作,特别适合处理高并发场景。
pip install fastapi uvicorn[standard] asyncpg sqlalchemy databases postgresql
关键组件说明:
asyncpg:PostgreSQL的异步驱动,性能优于传统psycopg2databases:提供统一的异步数据库接口sqlalchemy:虽然使用Core模式,但保留了部分ORM特性创建PostgreSQL容器(开发环境推荐):
docker run --name api_db -e POSTGRES_PASSWORD=secret -p 5432:5432 -d postgres:14
初始化数据库脚本示例:
CREATE DATABASE fastapi_demo;CREATE TABLE items (id SERIAL PRIMARY KEY,name VARCHAR(100) NOT NULL,description TEXT,price NUMERIC(10, 2) CHECK (price > 0),created_at TIMESTAMP DEFAULT NOW());
from databases import DatabaseDATABASE_URL = "postgresql://postgres:secret@localhost/fastapi_demo"database = Database(DATABASE_URL)async def init_db():await database.connect()async def close_db():await database.disconnect()
连接池默认大小为10,通过max_connections参数可调整。生产环境建议配置连接超时和重试机制。
from pydantic import BaseModelfrom datetime import datetimeclass Item(BaseModel):name: strdescription: str | None = Noneprice: floatcreated_at: datetime | None = Noneclass ItemInDB(Item):id: int
使用Pydantic模型实现:
async def create_item(item: Item) -> dict:query = """INSERT INTO items (name, description, price)VALUES (:name, :description, :price)RETURNING id, name, description, price, created_at"""values = {"name": item.name,"description": item.description,"price": item.price}return await database.fetch_one(query, values)async def get_items(skip: int = 0, limit: int = 100) -> list:query = "SELECT * FROM items ORDER BY created_at DESC LIMIT :limit OFFSET :skip"return await database.fetch_all(query, {"skip": skip, "limit": limit})
关键优化点:
from fastapi import FastAPI, HTTPExceptionapp = FastAPI()@app.on_event("startup")async def startup():await init_db()@app.on_event("shutdown")async def shutdown():await close_db()@app.post("/items/", response_model=ItemInDB)async def create_item_endpoint(item: Item):db_item = await create_item(item)if not db_item:raise HTTPException(status_code=400, detail="Item creation failed")return db_item@app.get("/items/", response_model=list[ItemInDB])async def read_items(skip: int = 0, limit: int = 100):return await get_items(skip, limit)
依赖注入示例:
from fastapi import Dependsasync def get_db():try:yield databasefinally:await database.disconnect()@app.get("/items/{item_id}")async def read_item(item_id: int, db: Database = Depends(get_db)):query = "SELECT * FROM items WHERE id = :item_id"item = await db.fetch_one(query, {"item_id": item_id})if not item:raise HTTPException(status_code=404, detail="Item not found")return item
CREATE INDEX idx_items_name ON items (name);
max_body_size参数)
.├── main.py # 主入口文件├── models.py # 数据模型定义├── crud.py # 数据库操作├── schemas.py # Pydantic模型├── database.py # 数据库连接├── tests/ # 测试目录│ ├── test_api.py # API测试│ └── test_db.py # 数据库测试└── requirements.txt # 依赖文件
from contextlib import asynccontextmanager@asynccontextmanagerasync def lifespan(app: FastAPI):await init_db()yieldawait close_db()app = FastAPI(lifespan=lifespan)
async def transfer_funds(from_id: int, to_id: int, amount: float):async with database.transaction():# 执行多个相关操作pass
async def bulk_insert(items: list[Item]):query = "INSERT INTO items (name, description, price) VALUES "values = []params = []for item in items:values.append(f"(:name_{len(params)}, :description_{len(params)}, :price_{len(params)})")params.extend([{"name": f"name_{len(params)}", "value": item.name},{"name": f"description_{len(params)}", "value": item.description},{"name": f"price_{len(params)}", "value": item.price}])query += ",".join(values)return await database.execute(query, {p["name"]: p["value"] for p in params})
查询优化:
缓存策略:
from fastapi_cache import FastAPICachefrom fastapi_cache.backends.redis import RedisBackendfrom redis import asyncio as aioredisasync def init_cache():redis = aioredis.from_url("redis://localhost")FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
连接复用:
添加GraphQL支持:
from strawberry.fastapi import GraphQLRouterimport strawberry@strawberry.typeclass ItemType:id: intname: strschema = strawberry.Schema(Query)graphql_app = GraphQLRouter(schema)app.include_router(graphql_app, prefix="/graphql")
实现WebSocket:
from fastapi import WebSocket@app.websocket("/ws")async def websocket_endpoint(websocket: WebSocket):await websocket.accept()while True:data = await websocket.receive_text()await websocket.send_text(f"Message text was: {data}")
添加健康检查:
@app.get("/health")async def health_check():try:await database.execute("SELECT 1")return {"status": "healthy"}except Exception as e:return {"status": "unhealthy", "error": str(e)}
FastAPI与PostgreSQL的组合为现代API开发提供了高效、可靠的解决方案。通过异步编程模型,系统能够轻松处理数千并发连接。PostgreSQL的强大功能集(如地理空间支持、全文检索)使得复杂业务逻辑的实现变得简单。
未来发展方向:
开发者应持续关注ASGI生态的发展,特别是FastAPI 2.0版本带来的新特性。同时,PostgreSQL 15+版本提供的逻辑复制和更细粒度的权限控制,为构建企业级应用提供了更多可能性。