简介:本文详细讲解如何使用FastAPI快速开发Web API项目,并重点介绍如何高效连接MySQL数据库,包括环境配置、依赖安装、连接池优化及完整示例代码。
FastAPI作为现代Python Web框架,凭借其基于类型注解的自动文档生成、异步支持和高性能特性,已成为开发RESTful API的首选工具。而MySQL作为最流行的开源关系型数据库,其稳定性、事务支持和丰富的SQL功能为Web应用提供了可靠的数据存储方案。两者的结合能够快速构建出高性能、可扩展的Web服务,满足从初创项目到企业级应用的多样化需求。
FastAPI基于Starlette和Pydantic,在基准测试中展现出比Flask快2-3倍的请求处理能力。配合MySQL的InnoDB存储引擎,通过合理的索引设计和查询优化,可轻松支撑每秒数千次的API调用。这种组合特别适合需要低延迟响应的实时数据应用,如金融交易系统、物联网数据平台等。
FastAPI的自动API文档(Swagger UI和ReDoc)功能使前后端协作变得前所未有的高效。开发者只需定义Pydantic模型和路由函数,即可同时获得类型安全的接口和交互式文档。结合MySQL的ORM工具(如SQLAlchemy或Tortoise-ORM),可以大幅减少样板代码,将开发重心转移到业务逻辑实现上。
创建虚拟环境后,安装核心依赖:
pip install fastapi uvicorn[standard] sqlalchemy pymysql asyncmy
asyncmy:纯Python实现的异步MySQL驱动,比aiomysql性能更优sqlalchemy:功能全面的ORM工具,支持同步和异步模式tortoise-orm(异步优先ORM)或databases(数据库抽象层)创建MySQL测试数据库:
CREATE DATABASE fastapi_demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;CREATE USER 'fastapi_user'@'%' IDENTIFIED BY 'secure_password';GRANT ALL PRIVILEGES ON fastapi_demo.* TO 'fastapi_user'@'%';FLUSH PRIVILEGES;
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.orm import sessionmakerfrom contextlib import asynccontextmanagerDATABASE_URL = "mysql+asyncmy://fastapi_user:secure_password@localhost/fastapi_demo"engine = create_async_engine(DATABASE_URL, echo=True)AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)@asynccontextmanagerasync def get_db():async with AsyncSessionLocal() as session:try:yield sessionawait session.commit()except Exception:await session.rollback()raise
# 推荐配置(根据实际负载调整)engine = create_async_engine(DATABASE_URL,pool_size=15, # 连接池大小max_overflow=10, # 超出pool_size后的最大连接数pool_timeout=30, # 获取连接超时时间(秒)pool_recycle=3600, # 连接回收时间(秒)echo=False # 生产环境设为False)
from sqlalchemy import Column, Integer, String, DateTimefrom sqlalchemy.ext.declarative import declarative_basefrom datetime import datetimeBase = declarative_base()class User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, index=True)username = Column(String(50), unique=True, index=True)email = Column(String(100), unique=True)created_at = Column(DateTime, default=datetime.utcnow)
from sqlalchemy import select, insert, update, deletefrom sqlalchemy.exc import IntegrityErrorclass UserCRUD:def __init__(self, db):self.db = dbasync def create_user(self, username: str, email: str):stmt = insert(User).values(username=username, email=email)try:await self.db.execute(stmt)await self.db.commit()return {"message": "User created successfully"}except IntegrityError:await self.db.rollback()return {"error": "Username or email already exists"}async def get_user(self, user_id: int):stmt = select(User).where(User.id == user_id)result = await self.db.execute(stmt)return result.scalar_one_or_none()
from fastapi import FastAPI, Depends, HTTPExceptionfrom pydantic import BaseModelapp = FastAPI()class UserRequest(BaseModel):username: stremail: str@app.post("/users/")async def create_user(user: UserRequest, db=Depends(get_db)):crud = UserCRUD(db)result = await crud.create_user(user.username, user.email)if "error" in result:raise HTTPException(status_code=400, detail=result["error"])return result@app.get("/users/{user_id}")async def read_user(user_id: int, db=Depends(get_db)):crud = UserCRUD(db)user = await crud.get_user(user_id)if user is None:raise HTTPException(status_code=404, detail="User not found")return user
executemany进行批量插入
# 高效分页示例async def get_users_paginated(db, last_id: int = None, limit: int = 10):query = select(User).order_by(User.id)if last_id:query = query.where(User.id > last_id)query = query.limit(limit)return await db.execute(query)
async def transfer_funds(db, from_id: int, to_id: int, amount: float):async with db.begin(): # 自动提交/回滚# 扣款操作await db.execute(update(Account).where(Account.id == from_id).values(balance=Account.balance - amount))# 存款操作await db.execute(update(Account).where(Account.id == to_id).values(balance=Account.balance + amount))
prometheus_client暴露API指标
@app.get("/health")async def health_check(db=Depends(get_db)):try:await db.execute(select(1))return {"status": "healthy"}except Exception as e:return {"status": "unhealthy", "error": str(e)}
# 调整连接参数engine = create_async_engine(DATABASE_URL,connect_args={"connect_timeout": 10,"read_timeout": 30,"write_timeout": 30})
确保数据库、表和连接都使用utf8mb4字符集,以完整支持emoji等4字节字符。
# 在数据库URL中指定时区DATABASE_URL = "mysql+asyncmy://user:pass@host/db?charset=utf8mb4&timezone=UTC"# 或在应用启动时设置from sqlalchemy import eventfrom sqlalchemy.engine import Engine@event.listens_for(engine, "connect")def set_timezone(dbapi_conn, connection_record):dbapi_conn.execute("SET time_zone='+00:00'")
使用Alembic管理Schema变更:
pip install alembicalembic init alembic
生成迁移脚本后执行:
alembic revision --autogenerate -m "add users table"alembic upgrade head
from databases import Databasedatabases = {"primary": Database("mysql+asyncmy://..."),"analytics": Database("mysql+asyncmy://analytics_user:...@host/analytics_db")}@app.get("/complex-report")async def get_report():async with databases["primary"].connection() as primary_conn:async with databases["analytics"].connection() as analytics_conn:# 跨库查询...
结合MySQL的binlog和Debezium实现CDC(变更数据捕获),构建实时数据管道。
FastAPI与MySQL的集成提供了构建现代Web API的强大工具链。通过异步编程模型、ORM工具和完善的连接管理,开发者可以快速实现高性能、可扩展的后端服务。未来发展方向包括:
建议开发者持续关注FastAPI和MySQL的最新版本更新,充分利用新特性提升开发效率和系统性能。在实际项目中,建议从简单CRUD开始,逐步引入缓存、消息队列等高级组件,构建健壮的企业级应用。