简介:本文详细讲解FastAPI中设置定时任务的多种方法,涵盖APScheduler、Celery、系统级定时任务等方案,提供完整代码示例与生产环境建议。
在构建现代化Web服务时,定时任务是不可或缺的功能模块。FastAPI作为高性能异步Web框架,虽然不直接提供定时任务功能,但通过与专业调度库的集成,可以轻松实现复杂的定时任务管理。本文将系统讲解FastAPI中设置定时任务的多种方案,帮助开发者根据业务需求选择最适合的实现方式。
APScheduler(Advanced Python Scheduler)是Python生态中最成熟的定时任务库之一,其轻量级特性使其成为FastAPI项目的首选方案。
通过BackgroundTasks与APScheduler结合,可以实现无阻塞的定时任务执行:
from fastapi import FastAPI, BackgroundTasksfrom apscheduler.schedulers.background import BackgroundSchedulerimport datetimeapp = FastAPI()scheduler = BackgroundScheduler()scheduler.add_job(func=lambda: print(f"任务执行于 {datetime.datetime.now()}"),trigger="interval",seconds=5)scheduler.start()@app.on_event("startup")def startup_event():scheduler.start()@app.on_event("shutdown")def shutdown_event():scheduler.shutdown()
对于生产环境,建议采用异步调度器并配置持久化存储:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')}executors = {'default': ThreadPoolExecutor(20),'processpool': ProcessPoolExecutor(5)}scheduler = BackgroundScheduler(jobstores=jobstores,executors=executors,timezone="Asia/Shanghai")
通过API接口实现任务的动态添加与删除:
from pydantic import BaseModelclass TaskModel(BaseModel):id: strfunc_name: strinterval: int@app.post("/add-task")def add_task(task: TaskModel):scheduler.add_job(id=task.id,func=globals()[task.func_name],trigger="interval",seconds=task.interval)return {"status": "success"}@app.delete("/remove-task/{task_id}")def remove_task(task_id: str):scheduler.remove_job(task_id)return {"status": "success"}
对于需要分布式处理的高并发场景,Celery是更合适的选择。
# celery_app.pyfrom celery import Celerycelery = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1',include=['tasks'])celery.conf.beat_schedule = {'every-10-seconds': {'task': 'tasks.print_time','schedule': 10.0},}
# main.pyfrom fastapi import FastAPIfrom celery_app import celeryapp = FastAPI()@app.on_event("startup")async def startup_event():worker = celery.Worker(hostname='fastapi@%h',loglevel='INFO',pool='prefork')# 实际生产环境建议使用独立进程管理@app.get("/trigger-task")def trigger_task():celery.send_task('tasks.heavy_computation')return {"status": "task triggered"}
chunk_size参数分片result_expires避免结果堆积对于简单定时任务,系统级Cron仍是可靠选择:
# /etc/cron.d/fastapi_tasks* * * * * root curl -X POST http://localhost:8000/run-daily-task
更现代的Linux系统推荐使用systemd定时器:
# /etc/systemd/system/fastapi-task.timer[Unit]Description=Run daily FastAPI task[Timer]OnCalendar=*-*-* 03:00:00Persistent=true[Install]WantedBy=timers.target
# Dockerfile示例FROM python:3.9-slimWORKDIR /appCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txtCOPY . .CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "main:app"]
prometheus_client暴露任务执行指标
from apscheduler.job import Jobdef job_wrapper(func):def wrapper(*args, **kwargs):# 实现互斥锁逻辑passreturn wrapper@job_wrapperdef critical_task():# 关键任务实现
from pytz import timezonescheduler = BackgroundScheduler(timezone=timezone('Asia/Shanghai'))# 或在任务定义时指定scheduler.add_job(func=my_task,trigger='cron',hour=8,minute=30,timezone='Asia/Shanghai')
asyncio.gather并行执行IO密集型操作max_instances参数控制并发任务数
scheduler.add_job(id='data_processing',func=process_data,trigger='interval',minutes=30,max_instances=3 # 最多同时运行3个实例)
/project├── app/│ ├── __init__.py│ ├── main.py # FastAPI入口│ ├── scheduler.py # 调度器配置│ ├── tasks/ # 任务模块│ │ ├── __init__.py│ │ ├── data.py│ │ └── report.py├── tests/ # 测试目录└── requirements.txt
通过合理选择定时任务方案,可以显著提升FastAPI应用的自动化能力和运维效率。建议根据业务规模、任务复杂度和运维能力进行综合评估,初期可从APScheduler轻量级方案入手,随着业务发展逐步迁移到更复杂的分布式方案。