创建工作流
接口描述
在指定工作空间下创建工作流
权限说明
用户在工作空间下创建工作流,需具有工作空间下新建工作流WORKFLOW_CREATE
权限
注意事项
- 工作空间不存在时,创建工作流会失败
- 如果当前用户没有新建工作流
WORKFLOW_CREATE
权限,创建工作流会失败
请求结构
1POST /v2/workspaces/{workspaceId}/job HTTP/1.1
2Host: databuilder.bd.baidubce.com
3Authorization: authorization string
4
5{
6 "name": "job_example",
7 "description": "示例作业描述",
8 "tasks": [
9 {
10 "id": "tid-ray",
11 "name": "RayTask",
12 "description": "Ray 任务示例",
13 "clusterList": [
14 {
15 "clusterId": "compute_xxxxx",
16 "engineType": "RAY",
17 "clusterType": "RESIDENT"
18 }
19 ],
20 "rayTask": {
21 "taskParam": {
22 "codePath": "dir1/dir2/main.py",
23 "entryPoint": "python main.py",
24 "runtimeEnv": [
25 {
26 "key": "ENV1",
27 "value": "value1"
28 }
29 ]
30 }
31 },
32 "compoundTask": {
33 "taskParam": {
34 "parallel": 1
35 },
36 "operators": [
37 {
38 "id": "opid-1",
39 "name": "opName1",
40 "params": [
41 {
42 "key": "paramKey1",
43 "value": "paramValue1"
44 }
45 ],
46 "metaData": {
47 "catalogName": "catalog1",
48 "schemaName": "schema1",
49 "operatorName": "operator1",
50 "version": "v1"
51 },
52 "dependsOn": []
53 },
54 {
55 "id": "opid-2",
56 "name": "opName2",
57 "params": [
58 {
59 "key": "paramKey2",
60 "value": "paramValue2"
61 }
62 ],
63 "metaData": {
64 "catalogName": "catalog2",
65 "schemaName": "schema2",
66 "operatorName": "operator2",
67 "version": "v1"
68 },
69 "dependsOn": [
70 "opid-1"
71 ]
72 }
73 ]
74 },
75 "sparkTask": {
76 "sparkTaskParam": {
77 "mainClass": "com.example.MainClass",
78 "dependentLibraries": [
79 "lib1.jar"
80 ],
81 "mainClassArgs": [
82 "arg1"
83 ],
84 "envVars": [
85 {
86 "key": "SPARK_ENV",
87 "value": "production"
88 }
89 ],
90 "sparkConf": [
91 {
92 "key": "spark.executor.memory",
93 "value": "4g"
94 }
95 ],
96 "templateId": "template_01"
97 }
98 },
99 "pySparkTask": {
100 "pySparkTaskParam": {
101 "mainApplicationPyFile": "dir1/dir2/main.py",
102 "dependentLibraries": [
103 "pyspark_lib1.zip"
104 ],
105 "mainClassArgs": [
106 "arg1"
107 ],
108 "envVars": [
109 {
110 "key": "PYSPARK_ENV",
111 "value": "dev"
112 }
113 ],
114 "templateId": "template_pyspark",
115 "sparkConf": [
116 {
117 "key": "spark.executor.cores",
118 "value": "2"
119 }
120 ]
121 }
122 },
123 "notebookTask": {
124 "notebookTaskParam": {
125 "jupyterFilePath": "dir1/dir2/notebookFile.ipynb"
126 }
127 },
128 "fileIntegrationTask": {
129 "fileIntegrationTaskParam": {
130 "integrationJobId": "xxxxxxxxx"
131 }
132 },
133 "tableIntegrationTask": {
134 "tableIntegrationTaskParam": {
135 "integrationJobId": "xxxxxxxxxxx"
136 }
137 },
138 "dependentTask": {
139 "dependentTaskParam": {
140 "workspaceId": "workspaceId1",
141 "jobId": "jobId1",
142 "depTaskId": "tid-1",
143 "dateValue": "today",
144 "checkInterval": 300,
145 "failurePolicy": "DEPENDENT_FAILURE_FAILURE",
146 "failureWaitingTime": 60
147 }
148 },
149 "dependsOn": []
150 }
151 ]
152}
请求头域
除公共头域外,无其它特殊头域
请求参数
字段名 | 类型 | 是否必填 | 参数位置 | 说明 |
---|---|---|---|---|
workspaceId | String | 是 | Path参数 | 工作空间ID,不允许为空或者只包含空白字符 |
name | String | 是 | Body参数 | 工作流名称,最大长度256个字符,只能包含数字,大小写字母,中文,横线以及下划线 |
description | String | 是 | Body参数 | 工作流描述,最大长度500个字符 |
tasks | Task[] | 是 | Body参数 | 工作流包含的任务列表 |
Task
task为工作流的子节点,目前支持的类型有:
- RayTask: Ray任务
- CompoundTask:算子任务,算子任务由多个算子构成
- SparkTask: SparkJar任务
- PySparkTask: PySpark任务
- NotebookTask: Notebook任务,
- FileIntegrationTask: 文件采集任务
- TableIntegrationTask: 库表采集任务
- dependentTask: 依赖检查任务
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
id | String | 是 | 任务 ID,最大长度20个字符,并且以tid-开始,且后续字符只支持小写字母和数字 |
name | String | 是 | 任务名称, 最大长度256个字符,且只能包含数字, 大小写字母, 中文,下划线以及横线,不可只包含空白字符 |
clusterList | ClusterConf[] | 否 | 计算实例列表 |
description | String | 否 | 任务描述,最大长度500个字符 |
rayTask | RayTask | 否 | Ray 任务配置, 与其他类型的任务(Task)互斥 |
compoundTask | CompoundTask | 否 | 算子任务任务配置,与其他类型的任务(Task)互斥 |
sparkTask | SparkTask | 否 | SparkJar任务配置,与其他类型的任务(Task)互斥 |
pySparkTask | PySparkTask | 否 | PySpark任务配置,与其他类型的任务(Task)互斥 |
fileIntegrationTask | FileIntegrationTask | 否 | 文件采集任务配置, 与其他类型的任务(Task)互斥 |
tableIntegrationTask | TableIntegrationTask | 否 | 库表采集任务配置, 与其他类型的任务(Task)互斥 |
notebookTask | NotebookTask | 否 | Notebook任务配置, 与其他类型的任务(Task)互斥 |
dependentTask | DependentTask | 否 | 依赖组件任务配置, 与其他类型的任务(Task)互斥 |
dependsOn | String[] | 是 | 任务依赖的所有上游任务的ID列表,如果没有上游任务,则传入空列表[] |
一个任务仅支持一种类型,即不允许提供多个任务配置。
响应头域
除公共头域外,无其它特殊头域
响应参数
参数名称 | 类型 | 描述 |
---|---|---|
requestId | String | 请求ID |
code | String | 响应码 |
message | String | 响应描述信息 |
result | String | 工作流Id |
错误码
错误码 | 错误描述 | HTTP状态码 | 中文解释 |
---|---|---|---|
AccessDenied | 无操作权限 | 403 | 暂无操作权限 |
IllegalArgument | 参数非法 | 400 | 用户提供参数错误 |
请求示例
提交包含一个Ray任务的工作流
Ray任务的参数通过RayTaskParam
类型的参数taskParam
指定,具体描述如下
RayTask.RayTaskParam
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
codePath | string | 是 | 工作区内Ray任务程序代码路径 |
entryPoint | string | 是 | Ray任务入口命令,形如 python my_task.py |
runtimeEnv | TaskEnvVar[] | 否 | key-value格式的运行环境变量列表 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-ray",
17 "name": "RayTask",
18 "description": "Ray 任务示例",
19 "clusterList": [
20 {
21 "clusterId": "cluster_ray_01",
22 "engineType": "RAY",
23 "clusterType": "RESIDENT"
24 }
25 ],
26 "rayTask": {
27 "taskParam": {
28 "codePath": "ray/code/path",
29 "entryPoint": "main.py",
30 "runtimeEnv": [
31 {
32 "key": "ENV1",
33 "value": "value1"
34 }
35 ]
36 }
37 },
38 "dependsOn": []
39 }
40 ]
41}
提交包含一个算子任务的工作流
算子任务的参数通过CompoundTaskParam
类型的参数taskParam
指定,具体描述如下:
CompoundTask
字段名 | 字段类型 | 是否必填 | 字段说明 |
---|---|---|---|
taskParam | CompoundTaskParam | 是 | 算子任务参数,包含算子并发度配置 |
operators | Operator[] | 是 | 算子任务包含的算子列表 |
CompoundTask.CompoundTaskParam
字段名 | 字段类型 | 是否必填 | 字段说明 |
---|---|---|---|
parallel | int | 是 | CompoundTask任务下所有算子的默认并发 |
CompoundTask.Operator
字段名 | 字段类型 | 是否必填 | 字段说明 |
---|---|---|---|
dependsOn | String[] | 是 | 当前算子的所有上游算子的ID列表,若无上游,则提供空列表[]即可 |
id | String | 是 | 当前算子ID, 算子id最大长度为20个字符,并以opid-开头,且后续只包含数字和小写字母 |
metaData | MetaData | 是 | 算子元信息,包含数据仓库(catalog),数据模型(schema), 算子名称以及算子版本信息 |
name | String | 是 | 算子名称,最大长度为256,仅支持大小写字母,数字,中文以及下划线和横线 |
params | OperatorParam[] | 是 | key-value格式的算子参数的列表 |
CompoundTask.Operator.Metadata
字段名 | 字段类型 | 是否必填 | 字段说明 |
---|---|---|---|
catalogName | string | 是 | 算子归属的数据目录catalog名称 |
operatorName | string | 是 | 算子在元数据中名称 |
schemaName | string | 是 | 算子归属的数据模式database/schema名称 |
version | string | 是 | 算子版本 |
CompoundTask.Operator.OperatorParam
字段名 | 字段类型 | 是否必填 | 字段说明 |
---|---|---|---|
key | string | 是 | 算子参数名 |
value | string | 是 | 算子参数值 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-compound",
17 "name": "Compound 任务",
18 "description": "Compound 任务示例",
19 "clusterList": [
20 {
21 "clusterId": "cluster_compound_01",
22 "engineType": "Ray",
23 "clusterType": "RESIDENT"
24 }
25 ],
26 "compoundTask": {
27 "taskParam": {
28 "parallel": 5
29 },
30 "operators": [
31 {
32 "id": "opid-0",
33 "name": "TestOp1",
34 "params": [
35 {
36 "key": "paramKey",
37 "value": "paramValue",
38 "valueType": "STRING"
39 }
40 ],
41 "metaData": {
42 "catalogName": "catalog_example",
43 "schemaName": "schema_example",
44 "operatorName": "operator_name",
45 "version": "v1"
46 },
47 "dependsOn": [
48 "operator_dep_01"
49 ]
50 }
51 ]
52 },
53 "dependsOn": [
54
55 ]
56 }
57 ]
58}
提交包含一个SparkJar和PySpark任务的工作流
示例中包含一个SparkJar任务和一个PySpark任务,后者为前者的下游任务。其中SparkJar
任务的参数通过SparkTaskParam
类型的参数sparkTaskParam
指定,PySparkTask
任务的参数通过PySparkParam
类型的参数pySparkTaskParam
的参数pySparkTaskParam
指定。具体描述如下:
SparkTask.SparkTaskParam
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
dependentLibraries | String[] | 是 | 依赖的 JAR 包以及zip包在工作区内的路径的列表 |
mainClass | String | 是 | Spark 应用的主类名 |
mainClassArgs | String[] | 是 | 主类参数列表,比如 --arg1 value1 --arg2 value2 value3这样的一组参数,需要通过["--arg1", "value1", "--arg2", "value2", "value3"]这样的方式传入 |
envVars | TaskEnvVar[] | 否 | key-value格式的环境变量列表 |
sparkConf | SparkConfEntry[] | 否 | key-value格式的Spark 配置项列表 |
templateId | String | 是 | 任务实例模板 ID |
PySparkTask.PySparkTaskParam
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
mainApplicationPyFile | String | 是 | PySpark任务的主程序 Python 文件在工作区内的路径 |
dependentLibraries | String[] | 是 | py以及zip类型的依赖库在工作区内路径列表 |
mainClassArgs | String[] | 是 | 主类参数列表,比如 --arg1 value1 --arg2 value2 value3这样的一组参数,需要通过["--arg1", "value1", "--arg2", "value2", "value3"]这样的方式传入 |
envVars | TaskEnvVar[] | 否 | key-value格式的环境变量列表 |
templateId | String | 是 | 任务模板 ID |
sparkConf | SparkConfEntry[] | 否 | Spark 配置项列表 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-spark",
17 "name": "SparkTask",
18 "description": "Spark 任务示例",
19 "sparkTask": {
20 "sparkTaskParam": {
21 "mainClass": "com.example.MainClass",
22 "dependentLibraries": [
23 "lib1.jar"
24 ],
25 "mainClassArgs": [
26 "arg1"
27 ],
28 "envVars": [
29 {
30 "key": "SPARK_ENV",
31 "value": "production"
32 }
33 ],
34 "sparkConf": [
35 {
36 "key": "spark.executor.memory",
37 "value": "4g"
38 }
39 ],
40 "templateId": "template_01"
41 }
42 },
43 "dependsOn": [
44 ]
45 },
46 {
47 "id": "tid-pyspark",
48 "name": "PySparkTask",
49 "description": "PySpark 任务示例",
50 "pySparkTask": {
51 "pySparkTaskParam": {
52 "mainApplicationPyFile": "main.py",
53 "dependentLibraries": [
54 "pyspark_lib1.zip"
55 ],
56 "mainClassArgs": [
57 "arg1"
58 ],
59 "envVars": [
60 {
61 "key": "PYSPARK_ENV",
62 "value": "dev"
63 }
64 ],
65 "templateId": "template_pyspark",
66 "sparkConf": [
67 {
68 "key": "spark.executor.cores",
69 "value": "2"
70 }
71 ]
72 }
73 },
74 "dependsOn": [
75 "tid-spark"
76 ]
77 }
78 ]
79}
提交包含一个Notebook任务的工作流
Notebook
任务的参数通过NotebookTaskParam
类型的参数notebookTaskParam
指定,具体描述如下:
NotebookTask.NotebookTaskParam
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
jupyterFilePath | String | 是 | 工作空间内Notebook文件路径 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-notebook",
17 "name": "NotebookTask",
18 "description": "Notebook 任务示例",
19 "clusterList": [
20 {
21 "clusterId": "compute_xxxxx",
22 "engineType": "SPARK",
23 "clusterType": "RESIDENT"
24 }
25 ],
26 "notebookTask": {
27 "notebookTaskParam": {
28 "jupyterFilePath": "/path/to/notebook.ipynb"
29 }
30 },
31 "dependsOn": [
32
33 ]
34 }
35 ]
36}
提交包含库表采集任务和文件采集任务的工作流
本示例中同时包含库表采集任务和文件采集任务,且本例中库表采集任务为文件采集任务的下游任务。文件采集任务的参数通过FileIntegrationJobTaskParam
类型的参数fileIntegrationTaskParam
指定,库表采集任务的参数通过TableIntegrationJobTaskParam
类型的参数tableIntegrationTaskParam
指定。具体描述如下:
FileIntegrationJobTask.FileIntegrationJobTaskParam
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
integrationJobId | String | 是 | 文件采集任务参数对象 |
TableIntegrationJobTask.TableIntegrationJobTaskParam
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
integrationJobId | String | 是 | 文件采集任务ID |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-fileinte",
17 "name": "FileIntegration 任务",
18 "description": "文件集成任务示例",
19 "fileIntegrationTask": {
20 "fileIntegrationTaskParam": {
21 "integrationJobId": "xxxxxxxxx"
22 }
23 },
24 "dependsOn": []
25 },
26 {
27 "id": "tid-tableinte",
28 "name": "TableIntegration 任务",
29 "description": "表集成任务示例",
30 "tableIntegrationTask": {
31 "tableIntegrationTaskParam": {
32 "integrationJobId": "xxxxxxxxxxx"
33 }
34 },
35 "dependsOn": [
36 "tid-fileintegration"
37 ]
38 }
39 ]
40}
提交包含依赖检查任务和Ray任务的工作流
本示例中包含一个依赖检查任务以及一个Ray
任务,且Ray
任务为依赖检查任务的下游。依赖检查任务的参数由DependentTaskParam
类型的参数dependentTaskParam
指定,具体描述如下:
DependTask.DependTaskParam
字段名 | 类型 | 是否必填 | 说明 |
---|---|---|---|
workspaceId | String | 是 | 依赖任务的工作空间ID,不能为空且不能是空白字符 |
jobId | String | 是 | 依赖任务的工作流ID,不能为空且不能是空白字符 |
depTaskId | String | 否 | 依赖任务的任务ID |
dateValue | String | 是 | 指定被依赖任务的完成时间段,在依赖检查时会在指定的时间段内查找被依赖任务是否有成功执行的记录,支持如下的配置 1. 小时级别 2. 天级别 3. 周级别 4. 月级别 |
checkInterval | Int | 是 | 检查间隔(单位: 秒) |
failurePolicy | String | 是 | 失败处理策略,目前支持如下的两种策略 1. DEPENDENT_FAILURE_FAILURE: 依赖组件失败则任务bai 2. DEPENDENT_FAILURE_WAITING: 依赖组件失败则任务等待 |
failureWaitingTime | Int | 是 | 失败等待时间(单位: 分) |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-dependent",
17 "name": "Dependent 任务",
18 "description": "依赖任务示例",
19 "dependentTask": {
20 "dependentTaskParam": {
21 "workspaceId": "workspace_001",
22 "jobId": "job_001",
23 "depTaskId": "tid-other",
24 "dateValue": "today",
25 "checkInterval": 300,
26 "failurePolicy": "DEPENDENT_FAILURE_FAILURE",
27 "failureWaitingTime": 60
28 }
29 },
30 "dependsOn": []
31 },
32 {
33 "id": "tid-ray",
34 "name": "RayTask",
35 "description": "Ray 任务示例",
36 "clusterList": [
37 {
38 "clusterId": "cluster_ray_01",
39 "engineType": "Ray",
40 "clusterType": "RESIDENT"
41 }
42 ],
43 "rayTask": {
44 "taskParam": {
45 "codePath": "ray/code/path",
46 "entryPoint": "main.py",
47 "runtimeEnv": [
48 {
49 "key": "ENV1",
50 "value": "value1"
51 }
52 ]
53 }
54 },
55 "dependsOn": ["tid-dependent"]
56 }
57 ]
58}
响应示例
1HTTP/1.1 200 OK
2Content-Type: application/json
3Transfer-Encoding: chunked
4Date: Mon, 28 Jul 2025 05:36:55 GMT
5Keep-Alive: timeout=180
6Connection: keep-alive
7{
8 "requestId": "cb17c0b8-3fc4-4015-8604-2dfc3f0773a7",
9 "code": "SUCCESS",
10 "result": "jid_a63b52e1fba4ab08"
11}