简介:本文详细解析了FastAPI中设置定时任务的多种方法,包括APScheduler、Celery等主流方案,并提供完整代码示例与最佳实践建议。
在现代化Web服务架构中,定时任务已成为不可或缺的核心组件。FastAPI作为高性能异步框架,其定时任务功能广泛应用于:
相较于传统Cron方案,FastAPI的定时任务具有更强的灵活性和集成性。通过Python原生库或第三方扩展,开发者可以轻松实现毫秒级精度的任务调度,同时保持与FastAPI异步特性的完美兼容。
APScheduler是Python生态中最成熟的定时任务库之一,支持多种调度器类型:
from fastapi import FastAPIfrom apscheduler.schedulers.background import BackgroundSchedulerimport loggingapp = FastAPI()logger = logging.getLogger(__name__)scheduler = BackgroundScheduler()scheduler.add_job(func=lambda: logger.info("定时任务执行"),trigger="interval",seconds=10)scheduler.start()
misfire_grace_time处理任务延迟AsyncIOScheduler替代BackgroundScheduler以获得更好异步支持对于分布式任务队列需求,Celery是更合适的选择:
# celery_app.pyfrom celery import Celerycelery = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1')@celery.taskdef scheduled_task():print("Celery定时任务执行")
from fastapi import FastAPIfrom celery_app import celeryfrom celery.schedules import crontabapp = FastAPI()app.celery_app = celeryapp.celery_app.conf.beat_schedule = {'every-10-seconds': {'task': 'celery_app.scheduled_task','schedule': 10.0, # 每10秒执行},'daily-report': {'task': 'celery_app.daily_report','schedule': crontab(hour=8, minute=30), # 每天8:30执行}}
对于简单场景,HTTP轮询提供轻量级解决方案:
import asynciofrom fastapi import FastAPI, BackgroundTasksfrom datetime import datetimeapp = FastAPI()async def periodic_task():while True:print(f"任务执行时间: {datetime.now()}")await asyncio.sleep(10) # 每10秒执行@app.on_event("startup")async def startup_event():asyncio.create_task(periodic_task())
from fastapi import FastAPIimport httpximport asyncioapp = FastAPI()async def http_poller():async with httpx.AsyncClient() as client:while True:try:response = await client.post("http://localhost:8000/execute-task",json={"timestamp": datetime.now().isoformat()})print(f"任务执行结果: {response.status_code}")except Exception as e:print(f"任务执行失败: {str(e)}")await asyncio.sleep(30) # 每30秒轮询@app.post("/execute-task")async def execute_task(data: dict):# 实际任务逻辑return {"status": "success", "data": data}
from apscheduler.job import Job@app.post("/add-job")def add_job(job_id: str, interval: int):scheduler.add_job(id=job_id,func=execute_task,trigger="interval",seconds=interval)return {"status": "job added"}@app.delete("/remove-job/{job_id}")def remove_job(job_id: str):scheduler.remove_job(job_id)return {"status": "job removed"}
from apscheduler.triggers.chain import ChainTriggerdef task_a():print("执行任务A")def task_b():print("执行任务B")scheduler.add_job(task_a,trigger="interval",seconds=5,id="task_a")scheduler.add_job(task_b,trigger=ChainTrigger(job_id="task_a"),id="task_b")
随着Serverless架构的普及,FastAPI定时任务将向以下方向发展:
通过本文介绍的多种方案,开发者可以根据具体业务需求选择最适合的定时任务实现方式。在实际项目中,建议从简单方案开始,随着系统复杂度增加逐步引入更高级的分布式任务队列解决方案。