简介:本文深入解析DolphinScheduler API文档,涵盖基础概念、核心接口、使用场景及实战示例,帮助开发者高效集成工作流引擎,提升任务调度自动化水平。
DolphinScheduler作为一款分布式易扩展的可视化工作流任务调度系统,其API文档是开发者与系统交互的核心桥梁。API文档通过标准化接口定义,实现了工作流创建、任务调度、状态监控等功能的程序化控制。相较于Web界面操作,API接口提供了更高的灵活性和自动化能力,尤其适用于需要集成到现有系统或构建自定义调度逻辑的场景。
DolphinScheduler API文档采用RESTful风格设计,核心接口围绕工作流(Workflow)、任务(Task)、项目(Project)等实体展开。文档结构清晰,包含以下关键部分:
通过API文档,开发者可实现:
接口:POST /dolphinscheduler/projects/{projectCode}/workflows
参数:
name:工作流名称(必填)description:描述信息dag:DAG定义(JSON格式,包含节点和边)globalParams:全局参数示例:
{"name": "daily_etl","description": "每日数据ETL流程","dag": {"nodes": [{"id": "1", "type": "SHELL", "name": "数据抽取", "params": "sh extract.sh"},{"id": "2", "type": "SPARK", "name": "数据转换", "params": "--master yarn..."}],"edges": [{"source": "1", "target": "2"}]},"globalParams": [{"paramKey": "date", "paramValue": "${sys.date}"}]}
接口:POST /dolphinscheduler/projects/{projectCode}/executors/start-process-instance
关键参数:
workflowCode:工作流定义IDscheduleTime:计划执行时间(可选)failureStrategy:失败策略(CONTINUE/END)使用场景:
通过定时任务或事件触发启动ETL流程,支持参数化调度(如传入${biz.date}动态日期)。
接口:GET /dolphinscheduler/projects/{projectCode}/executions/{executionCode}/tasks/{taskCode}
响应字段:
state:任务状态(SUCCESS/FAILURE/RUNNING)startTime/endTime:执行时间戳logPath:日志文件路径实战建议:
结合告警系统,当任务状态为FAILURE时触发邮件或企业微信通知。
接口:PUT /dolphinscheduler/projects/{projectCode}/executions/{executionCode}/tasks/{taskCode}/stop
注意事项:
终止操作不可逆,需确认任务无依赖后续流程时使用。
接口:POST /dolphinscheduler/projects
权限控制:
需PROJECT_CREATE权限,创建后默认生成projectCode(唯一标识)。
接口:POST /dolphinscheduler/users/{userId}/projects/{projectCode}/permissions
权限类型:
PROJECT_OPERATOR:可操作项目内工作流PROJECT_READ_ONLY:仅查看权限/dolphinscheduler/users/login获取令牌,有效期默认2小时api-server.conf中配置允许访问的IP范围503 Service Unavailable错误实施指数退避重试executionCode和taskCode追踪问题10001表示工作流不存在,需检查projectCode和workflowCode结合代码生成工具(如Python的networkx库),可根据业务规则动态构建DAG。例如:
import requestsimport jsondef create_dynamic_workflow(project_code, nodes, edges):url = f"http://ds-api:12345/dolphinscheduler/projects/{project_code}/workflows"data = {"name": "dynamic_workflow","dag": {"nodes": nodes, "edges": edges}}response = requests.post(url, json=data, headers={"Authorization": "Bearer <JWT>"})return response.json()
通过API将DolphinScheduler作为Airflow的子流程执行器,实现混合调度:
from airflow import DAGfrom airflow.operators.python import PythonOperatorimport requestsdef trigger_ds_workflow():url = "http://ds-api:12345/dolphinscheduler/projects/1/executors/start-process-instance"data = {"workflowCode": 101, "failureStrategy": "END"}requests.post(url, json=data, headers={"Authorization": "Bearer <JWT>"})dag = DAG("ds_integration", schedule_interval="@daily")task = PythonOperator(task_id="trigger_ds", python_callable=trigger_ds_workflow, dag=dag)
通过API获取失败任务列表,推送至Prometheus Alertmanager:
# alertmanager.ymlroutes:- receiver: ds-failurematch:alertname: "DS_Task_Failed"receivers:- name: ds-failurewebhook_configs:- url: "http://ds-api:12345/dolphinscheduler/alerts"
exp字段)Authorization: Bearer <token>/queue/get-running-task-count接口轮询提交状态globalParams传递项目间共享参数/dolphinscheduler/projects/{projectCode}/dependencies接口建立显式依赖关系DolphinScheduler API文档为开发者提供了强大的程序化控制能力,通过合理设计接口调用逻辑,可实现从简单任务调度到复杂工作流编排的全方位需求。未来版本中,预计会增强以下功能:
建议开发者定期关注官方文档更新,参与社区讨论(GitHub Issues),共同推动工作流调度技术的演进。