简介:本文详细解析在FastAPI中设置定时任务的多种方法,涵盖APScheduler、Celery、系统级定时任务等方案,提供完整代码示例与生产环境实践建议。
在现代化Web服务架构中,定时任务已成为不可或缺的组件。无论是数据同步、日志清理、缓存刷新还是业务报表生成,定时任务都承担着自动化执行的关键角色。FastAPI作为基于Starlette和Pydantic的高性能框架,虽然不内置定时任务功能,但通过与多种调度库的集成,可以轻松实现这一需求。
APScheduler是Python生态中最流行的定时任务库之一,其灵活性和易用性使其成为FastAPI定时任务的首选方案。
from fastapi import FastAPIfrom apscheduler.schedulers.background import BackgroundSchedulerimport loggingapp = FastAPI()scheduler = BackgroundScheduler()scheduler.start()def job_function():logging.info("定时任务执行中...")@app.on_event("startup")async def startup_event():scheduler.add_job(job_function,"interval",minutes=1,id="sample_job",replace_existing=True)@app.on_event("shutdown")async def shutdown_event():scheduler.shutdown()
interval: 固定间隔执行cron: 类crontab表达式date: 单次指定时间执行
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorejobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')}scheduler = BackgroundScheduler(jobstores=jobstores)
def job_with_error_handling():try:# 业务逻辑passexcept Exception as e:logging.error(f"任务执行失败: {str(e)}")
对于需要分布式处理的复杂场景,Celery提供了更强大的解决方案。
pip install celery redis # 使用Redis作为broker
# celery_app.pyfrom celery import Celerycelery = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1')@celery.taskdef scheduled_task():# 任务逻辑return "任务完成"
from fastapi import FastAPIfrom celery_app import celeryapp = FastAPI()@app.get("/trigger")async def trigger_task():result = celery.send_task('tasks.scheduled_task')return {"task_id": result.id}
在Celery Beat配置文件中:
# celerybeat_schedule.pyfrom datetime import timedeltaCELERYBEAT_SCHEDULE = {'every-30-minutes': {'task': 'tasks.scheduled_task','schedule': timedelta(minutes=30)},}
对于简单场景,可以直接使用操作系统的定时任务功能。
# 编辑crontabcrontab -e# 添加以下内容(每分钟执行一次)* * * * * curl http://localhost:8000/api/task
import loggingfrom logging.handlers import RotatingFileHandlerlogger = logging.getLogger(__name__)handler = RotatingFileHandler('tasks.log', maxBytes=1000000, backupCount=3)logger.addHandler(handler)
from apscheduler.job import Jobdef add_unique_job(scheduler, func, trigger, **kwargs):existing_job = scheduler.get_job(func.__name__)if existing_job:existing_job.modify(**kwargs)else:scheduler.add_job(func, trigger, id=func.__name__, **kwargs)
project/├── main.py # FastAPI主程序├── tasks/│ ├── __init__.py│ ├── scheduler.py # APScheduler实现│ └── celery_tasks.py # Celery任务定义├── requirements.txt└── config.py # 配置管理
from pytz import utcscheduler = BackgroundScheduler(timezone=utc)# 或@scheduler.scheduled_job('cron', hour='8', timezone='Asia/Shanghai')
--preload
from fastapi import APIRouter, HTTPExceptionrouter = APIRouter()@router.post("/add-job")async def add_job(job_id: str, interval: int):try:scheduler.add_job(sample_task,'interval',minutes=interval,id=job_id)return {"status": "job added"}except Exception as e:raise HTTPException(status_code=400, detail=str(e))
from apscheduler.jobstores.mongodb import MongoDBJobStorejobstores = {'mongo': MongoDBJobStore(database='task_db',collection='jobs',host='mongodb://localhost:27017')}
| 方案 | 适用场景 | 复杂度 | 分布式支持 |
|---|---|---|---|
| APScheduler | 单机定时任务 | 低 | 否 |
| Celery | 分布式任务队列 | 中 | 是 |
| 系统定时任务 | 简单脚本执行 | 最低 | 否 |
| Airflow | 复杂工作流管理 | 高 | 是 |
选型建议:
通过合理选择定时任务方案,开发者可以充分发挥FastAPI的高性能特性,构建出稳定可靠的自动化任务系统。实际项目中,建议从APScheduler开始,随着系统复杂度增加再考虑引入Celery等更强大的解决方案。