创建工作流
接口描述
在指定工作空间下创建工作流
权限说明
用户在工作空间下创建工作流,需具有工作空间下新建工作流WORKFLOW_CREATE权限
注意事项
- 工作空间不存在时,创建工作流会失败
- 如果当前用户没有新建工作流
WORKFLOW_CREATE权限,创建工作流会失败
请求结构
1POST /v2/workspaces/{workspaceId}/job HTTP/1.1
2Host: databuilder.bd.baidubce.com
3Authorization: authorization string
4
5{
6 "name": "job_example",
7 "description": "示例作业描述",
8 "tasks": [
9 {
10 "id": "tid-ray",
11 "name": "RayTask",
12 "description": "Ray 任务示例",
13 "clusterList": [
14 {
15 "clusterId": "compute_xxxxx",
16 "engineType": "RAY",
17 "clusterType": "RESIDENT"
18 }
19 ],
20 "rayTask": {
21 "taskParam": {
22 "codePath": "dir1/dir2/main.py",
23 "entryPoint": "python main.py",
24 "runtimeEnv": [
25 {
26 "key": "ENV1",
27 "value": "value1"
28 }
29 ]
30 }
31 },
32 "compoundTask": {
33 "taskParam": {
34 "parallel": 1
35 },
36 "operators": [
37 {
38 "id": "opid-1",
39 "name": "opName1",
40 "params": [
41 {
42 "key": "paramKey1",
43 "value": "paramValue1"
44 }
45 ],
46 "metaData": {
47 "catalogName": "catalog1",
48 "schemaName": "schema1",
49 "operatorName": "operator1",
50 "version": "v1"
51 },
52 "dependsOn": []
53 },
54 {
55 "id": "opid-2",
56 "name": "opName2",
57 "params": [
58 {
59 "key": "paramKey2",
60 "value": "paramValue2"
61 }
62 ],
63 "metaData": {
64 "catalogName": "catalog2",
65 "schemaName": "schema2",
66 "operatorName": "operator2",
67 "version": "v1"
68 },
69 "dependsOn": [
70 "opid-1"
71 ]
72 }
73 ]
74 },
75 "sparkTask": {
76 "sparkTaskParam": {
77 "mainClass": "com.example.MainClass",
78 "dependentLibraries": [
79 "lib1.jar"
80 ],
81 "mainClassArgs": [
82 "arg1"
83 ],
84 "envVars": [
85 {
86 "key": "SPARK_ENV",
87 "value": "production"
88 }
89 ],
90 "sparkConf": [
91 {
92 "key": "spark.executor.memory",
93 "value": "4g"
94 }
95 ],
96 "templateId": "template_01"
97 }
98 },
99 "pySparkTask": {
100 "pySparkTaskParam": {
101 "mainApplicationPyFile": "dir1/dir2/main.py",
102 "dependentLibraries": [
103 "pyspark_lib1.zip"
104 ],
105 "mainClassArgs": [
106 "arg1"
107 ],
108 "envVars": [
109 {
110 "key": "PYSPARK_ENV",
111 "value": "dev"
112 }
113 ],
114 "templateId": "template_pyspark",
115 "sparkConf": [
116 {
117 "key": "spark.executor.cores",
118 "value": "2"
119 }
120 ]
121 }
122 },
123 "notebookTask": {
124 "notebookTaskParam": {
125 "jupyterFilePath": "dir1/dir2/notebookFile.ipynb"
126 }
127 },
128 "fileIntegrationTask": {
129 "fileIntegrationTaskParam": {
130 "integrationJobId": "xxxxxxxxx"
131 }
132 },
133 "tableIntegrationTask": {
134 "tableIntegrationTaskParam": {
135 "integrationJobId": "xxxxxxxxxxx"
136 }
137 },
138 "dependentTask": {
139 "dependentTaskParam": {
140 "workspaceId": "workspaceId1",
141 "jobId": "jobId1",
142 "depTaskId": "tid-1",
143 "dateValue": "today",
144 "checkInterval": 300,
145 "failurePolicy": "DEPENDENT_FAILURE_FAILURE",
146 "failureWaitingTime": 60
147 }
148 },
149 "dependsOn": [],
150 "alertStrategies": [
151 {
152 "incidentAction": "55",
153 "alertScenes": [
154 "START","TIMEOUT"
155 ],
156 "timeout": 10
157 }
158 ]
159 }
160 ],
161 "alertStrategies": [
162 {
163 "incidentAction": "55",
164 "alertScenes": [
165 "START","TIMEOUT"
166 ],
167 "timeout": 10
168 }
169 ]
170}
请求头域
除公共头域外,无其它特殊头域
请求参数
| 字段名 | 类型 | 是否必填 | 参数位置 | 说明 |
|---|---|---|---|---|
workspaceId |
String |
是 | Path参数 | 工作空间ID,不允许为空或者只包含空白字符 |
name |
String |
是 | Body参数 | 工作流名称,最大长度256个字符,只能包含数字,大小写字母,中文,横线以及下划线 |
description |
String |
是 | Body参数 | 工作流描述,最大长度500个字符 |
tasks |
Task[] |
是 | Body参数 | 工作流包含的任务列表 |
alertStrategies |
AlertStrategy[] |
否 | Body参数 | 工作流告警策略 |
Task
Task为工作流的子节点,目前支持的类型有:
- RayTask: Ray任务
- CompoundTask:算子任务,算子任务由多个算子构成
- SparkTask: SparkJar任务
- PySparkTask: PySpark任务
- NotebookTask: Notebook任务,
- FileIntegrationTask: 文件采集任务
- TableIntegrationTask: 库表采集任务
- dependentTask: 依赖检查任务
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
id |
String |
是 | 任务 ID,最大长度20个字符,并且以tid-开始,且后续字符只支持小写字母和数字 |
name |
String |
是 | 任务名称, 最大长度256个字符,且只能包含数字, 大小写字母, 中文,下划线以及横线,不可只包含空白字符 |
clusterList |
ClusterConf[] |
否 | 计算实例列表 |
description |
String |
否 | 任务描述,最大长度500个字符 |
rayTask |
RayTask |
否 | Ray 任务配置, 与其他类型的任务(Task)互斥 |
compoundTask |
CompoundTask |
否 | 算子任务任务配置,与其他类型的任务(Task)互斥 |
sparkTask |
SparkTask |
否 | SparkJar任务配置,与其他类型的任务(Task)互斥 |
pySparkTask |
PySparkTask |
否 | PySpark任务配置,与其他类型的任务(Task)互斥 |
fileIntegrationTask |
FileIntegrationTask |
否 | 文件采集任务配置, 与其他类型的任务(Task)互斥 |
tableIntegrationTask |
TableIntegrationTask |
否 | 库表采集任务配置, 与其他类型的任务(Task)互斥 |
notebookTask |
NotebookTask |
否 | Notebook任务配置, 与其他类型的任务(Task)互斥 |
dependentTask |
DependentTask |
否 | 依赖组件任务配置, 与其他类型的任务(Task)互斥 |
dependsOn |
String[] |
是 | 任务依赖的所有上游任务的ID列表,如果没有上游任务,则传入空列表[] |
alertStrategies |
AlertStrategy[] |
否 | 任务告警策略 |
Task.ClusterConf
| 字段名 | 字段类型 | 是否必填 | 字段说明 |
|---|---|---|---|
clusterId |
string |
是 | 计算实例ID |
clusterType |
string |
是 | 计算实例类型,包括RESIDENT: 常驻集群EPHEMERAL: 非常驻集群该字段仅支持常驻集群RESIDENT,若要选择非常驻集群,则需要通过给任务指定任务实例模板ID,即templateId的方式来使用(目前只有Spark/PySpark任务支持配置任务实例模板) |
engineType |
string |
是 | 计算实例引擎类型, 在工作流中支持如下的引擎RAR:Ray集群, 用于算子任务以及Ray任务DORIS: 支持notebook任务SPARK: 支持Notebooke任务 |
一个任务仅支持一种类型,即不允许提供多个任务配置。
AlertStrategy
| 字段名 | 字段类型 | 是否必填 | 字段说明 |
|---|---|---|---|
incidentAction |
String |
否 | bcm通知模版id,可以从bcm页面获取到指定通知模版对应的id |
alertScenes |
String[] |
否 | 告警场景:START(启动)、SUCCESS(运行成功)、FAILURE(运行失败)、TIMEOUT(运行超时) |
timeout |
Integer |
否 | 超时时间,单位:分钟/min,只有告警场景里勾选了TIMEOUT(运行超时)之后才会设置该值,如果存在多条告警同时设置了超时时间,以第一个设置值生效 |
alarmEventPolicyName |
String |
否 | bcm侧的事件报警策略名称,创建/修改工作流更新告警策略时该值为空,修改工作流不更新告警策略时需要回传该值 |
响应头域
除公共头域外,无其它特殊头域
响应参数
| 参数名称 | 类型 | 描述 |
|---|---|---|
requestId |
String |
请求ID |
code |
String |
响应码 |
message |
String |
任务失败时,通过该字段返回错误详情 |
result |
String |
任务成功时,返回工作流Id |
错误码
| 错误码 | 错误描述 | HTTP状态码 | 中文解释 |
|---|---|---|---|
AccessDenied |
无操作权限 | 403 | 暂无操作权限 |
IllegalArgument |
参数非法 | 400 | 用户提供参数错误 |
请求示例
提交包含一个Ray任务的工作流
Ray任务的参数通过RayTaskParam类型的参数taskParam指定,具体描述如下
RayTask.RayTaskParam
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
codePath |
string |
是 | 工作区内Ray任务程序代码路径 |
entryPoint |
string |
是 | Ray任务入口命令,形如 python my_task.py |
runtimeEnv |
TaskEnvVar[] |
否 | key-value格式的运行环境变量列表 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-ray",
17 "name": "RayTask",
18 "description": "Ray 任务示例",
19 "clusterList": [
20 {
21 "clusterId": "compute_xxxxx",
22 "engineType": "RAY",
23 "clusterType": "RESIDENT"
24 }
25 ],
26 "rayTask": {
27 "taskParam": {
28 "codePath": "ray/code/path",
29 "entryPoint": "main.py",
30 "runtimeEnv": [
31 {
32 "key": "ENV1",
33 "value": "value1"
34 }
35 ]
36 }
37 },
38 "dependsOn": []
39 }
40 ]
41}
提交包含一个算子任务的工作流
算子任务的参数通过CompoundTaskParam类型的参数taskParam指定,具体描述如下:
CompoundTask
| 字段名 | 字段类型 | 是否必填 | 字段说明 |
|---|---|---|---|
taskParam |
CompoundTaskParam |
是 | 算子任务参数,包含算子并发度配置 |
operators |
Operator[] |
是 | 算子任务包含的算子列表 |
CompoundTask.CompoundTaskParam
| 字段名 | 字段类型 | 是否必填 | 字段说明 |
|---|---|---|---|
parallel |
int |
是 | CompoundTask任务下所有算子的默认并发 |
CompoundTask.Operator
| 字段名 | 字段类型 | 是否必填 | 字段说明 |
|---|---|---|---|
dependsOn |
String[] |
是 | 当前算子的所有上游算子的ID列表,若无上游,则提供空列表[]即可 |
id |
String |
是 | 当前算子ID, 算子id最大长度为20个字符,并以opid-开头,且后续只包含数字和小写字母 |
metaData |
MetaData |
是 | 算子元信息,包含数据仓库(catalog),数据模型(schema), 算子名称以及算子版本信息 |
name |
String |
是 | 算子名称,最大长度为256,仅支持大小写字母,数字,中文以及下划线和横线 |
params |
OperatorParam[] |
是 | key-value格式的算子参数的列表 |
CompoundTask.Operator.Metadata
| 字段名 | 字段类型 | 是否必填 | 字段说明 |
|---|---|---|---|
catalogName |
string |
是 | 算子归属的数据目录catalog名称 |
operatorName |
string |
是 | 算子在元数据中名称 |
schemaName |
string |
是 | 算子归属的数据模式database/schema名称 |
version |
string |
是 | 算子版本 |
CompoundTask.Operator.OperatorParam
| 字段名 | 字段类型 | 是否必填 | 字段说明 |
|---|---|---|---|
key |
string |
是 | 算子参数名 |
value |
string |
是 | 算子参数值 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-compound",
17 "name": "CompoundTask",
18 "description": "Compound 任务示例",
19 "clusterList": [
20 {
21 "clusterId": "compute_xxxxx",
22 "engineType": "RAY",
23 "clusterType": "RESIDENT"
24 }
25 ],
26 "compoundTask": {
27 "taskParam": {
28 "parallel": 5
29 },
30 "operators": [
31 {
32 "id": "opid-0",
33 "name": "TestOp1",
34 "params": [
35 {
36 "key": "paramKey1",
37 "value": "paramValue1"
38 }
39 ],
40 "metaData": {
41 "catalogName": "catalog1",
42 "schemaName": "schema1",
43 "operatorName": "operator1",
44 "version": "v1"
45 },
46 "dependsOn": []
47 },
48 {
49 "id": "opid-1",
50 "name": "TestOp2",
51 "params": [
52 {
53 "key": "paramKey2",
54 "value": "paramValue2"
55 }
56 ],
57 "metaData": {
58 "catalogName": "catalog2",
59 "schemaName": "schema2",
60 "operatorName": "operator2",
61 "version": "v1"
62 },
63 "dependsOn": ["opid-0"]
64 }
65 ]
66 },
67 "dependsOn": [
68
69 ]
70 }
71 ]
72}
提交包含一个SparkJar和PySpark任务的工作流
示例中包含一个SparkJar任务和一个PySpark任务,后者为前者的下游任务。其中SparkJar任务的参数通过SparkTaskParam类型的参数sparkTaskParam指定,PySparkTask任务的参数通过PySparkParam类型的参数pySparkTaskParam的参数pySparkTaskParam指定。具体描述如下:
SparkTask.SparkTaskParam
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
dependentLibraries |
String[] |
是 | 依赖的 JAR 包以及zip包在工作区内的路径的列表 |
mainClass |
String |
是 | Spark 应用的主类名 |
mainClassArgs |
String[] |
是 | 主类参数列表,比如 --arg1 value1 --arg2 value2 value3这样的一组参数,需要通过["--arg1", "value1", "--arg2", "value2", "value3"]这样的方式传入 |
envVars |
TaskEnvVar[] |
否 | key-value格式的环境变量列表 |
sparkConf |
SparkConfEntry[] |
否 | key-value格式的Spark 配置项列表 |
templateId |
String |
是 | 任务实例模板 ID |
PySparkTask.PySparkTaskParam
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
mainApplicationPyFile |
String |
是 | PySpark任务的主程序 Python 文件在工作区内的路径 |
dependentLibraries |
String[] |
是 | py以及zip类型的依赖库在工作区内路径列表 |
mainClassArgs |
String[] |
是 | 主类参数列表,比如--arg1 value1 --arg2 value2 value3这样的一组参数,需要通过["--arg1", "value1", "--arg2", "value2", "value3"]这样的方式传入 |
envVars |
TaskEnvVar[] |
否 | key-value格式的环境变量列表 |
templateId |
String |
是 | 任务模板 ID |
sparkConf |
SparkConfEntry[] |
否 | Spark 配置项列表 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-spark",
17 "name": "SparkTask",
18 "description": "Spark 任务示例",
19 "sparkTask": {
20 "sparkTaskParam": {
21 "mainClass": "com.example.MainClass",
22 "dependentLibraries": [
23 "lib1.jar"
24 ],
25 "mainClassArgs": [
26 "arg1"
27 ],
28 "envVars": [
29 {
30 "key": "SPARK_ENV",
31 "value": "production"
32 }
33 ],
34 "sparkConf": [
35 {
36 "key": "spark.executor.memory",
37 "value": "4g"
38 }
39 ],
40 "templateId": "template_01"
41 }
42 },
43 "dependsOn": [
44 ]
45 },
46 {
47 "id": "tid-pyspark",
48 "name": "PySparkTask",
49 "description": "PySpark 任务示例",
50 "pySparkTask": {
51 "pySparkTaskParam": {
52 "mainApplicationPyFile": "main.py",
53 "dependentLibraries": [
54 "pyspark_lib1.zip"
55 ],
56 "mainClassArgs": [
57 "arg1"
58 ],
59 "envVars": [
60 {
61 "key": "PYSPARK_ENV",
62 "value": "dev"
63 }
64 ],
65 "templateId": "template_pyspark",
66 "sparkConf": [
67 {
68 "key": "spark.executor.cores",
69 "value": "2"
70 }
71 ]
72 }
73 },
74 "dependsOn": [
75 "tid-spark"
76 ]
77 }
78 ]
79}
提交包含一个Notebook任务的工作流
Notebook任务的参数通过NotebookTaskParam类型的参数notebookTaskParam指定,具体描述如下:
NotebookTask.NotebookTaskParam
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
jupyterFilePath |
String |
是 | 工作空间内Notebook文件路径 |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-notebook",
17 "name": "NotebookTask",
18 "description": "Notebook 任务示例",
19 "clusterList": [
20 {
21 "clusterId": "compute_xxxxx",
22 "engineType": "SPARK",
23 "clusterType": "RESIDENT"
24 }
25 ],
26 "notebookTask": {
27 "notebookTaskParam": {
28 "jupyterFilePath": "/path/to/notebook.ipynb"
29 }
30 },
31 "dependsOn": [
32
33 ]
34 }
35 ]
36}
提交包含库表采集任务和文件采集任务的工作流
本示例中同时包含库表采集任务和文件采集任务,且本例中库表采集任务为文件采集任务的下游任务。文件采集任务的参数通过FileIntegrationJobTaskParam类型的参数fileIntegrationTaskParam指定,库表采集任务的参数通过TableIntegrationJobTaskParam类型的参数tableIntegrationTaskParam指定。具体描述如下:
FileIntegrationJobTask.FileIntegrationJobTaskParam
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
integrationJobId |
String |
是 | 文件采集任务参数对象 |
TableIntegrationJobTask.TableIntegrationJobTaskParam
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
integrationJobId |
String |
是 | 文件采集任务ID |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-fileintegration",
17 "name": "FileIntegrationTask",
18 "description": "文件采集任务示例",
19 "fileIntegrationTask": {
20 "fileIntegrationTaskParam": {
21 "integrationJobId": "xxxxxxxxx"
22 }
23 },
24 "dependsOn": []
25 },
26 {
27 "id": "tid-tableintegration",
28 "name": "TableIntegrationTask",
29 "description": "库表采集任务示例",
30 "tableIntegrationTask": {
31 "tableIntegrationTaskParam": {
32 "integrationJobId": "xxxxxxxxxxx"
33 }
34 },
35 "dependsOn": [
36 "tid-fileintegration"
37 ]
38 }
39 ]
40}
提交包含依赖检查任务和Ray任务的工作流
本示例中包含一个依赖检查任务以及一个Ray任务,且Ray任务为依赖检查任务的下游。依赖检查任务的参数由DependentTaskParam类型的参数dependentTaskParam指定,具体描述如下:
DependTask.DependTaskParam
| 字段名 | 类型 | 是否必填 | 说明 |
|---|---|---|---|
workspaceId |
String |
是 | 依赖任务的工作空间ID,不能为空且不能是空白字符 |
jobId |
String |
是 | 依赖任务的工作流ID,不能为空且不能是空白字符 |
depTaskId |
String |
否 | 依赖任务的任务ID |
dateValue |
String |
是 | 指定被依赖任务的完成时间段,在依赖检查时会在指定的时间段内查找被依赖任务是否有成功执行的记录,支持如下的配置小时级别currentHour: 当前小时last1Hour: 上一小时last2hours: 上两小时last3Hours: 上三小时last12Hours: 上十二小时last24Hours: 上二十四小时天级别today: 今天last1Days: 昨天last2Days: 前两天last3Days: 前三天last7Days: 最近七天周级别thisWeek: 本周lastWeek: 上周last2Weeks: 上两周last3Weeks: 上三周lastMonday: 上周一lastTuesday: 上周二lastWednesday: 上周三lastThursday: 上周四lastFriday: 上周五lastSaturday: 上周六lastSunday: 上周日月级别thisMouth: 本月``thisMonthBegin: 本月初thisMonthEnd: 本月底lastMonth: 上个月lastMonthBegin: 上月初lastMonthEnd: 上月底 |
checkInterval |
Int |
是 | 检查间隔(单位: 秒) |
failurePolicy |
String |
是 | 失败处理策略,目前支持如下的两种策略DEPENDENT_FAILURE_FAILURE: 依赖组件失败则任务baiDEPENDENT_FAILURE_WAITING: 依赖组件失败则任务等待 |
failureWaitingTime |
Int |
是 | 失败等待时间(单位: 分) |
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "tasks": [
15 {
16 "id": "tid-dependent",
17 "name": "DependentTask",
18 "description": "依赖检查任务示例",
19 "dependentTask": {
20 "dependentTaskParam": {
21 "workspaceId": "workspace_001",
22 "jobId": "job_001",
23 "depTaskId": "tid-other",
24 "dateValue": "today",
25 "checkInterval": 300,
26 "failurePolicy": "DEPENDENT_FAILURE_FAILURE",
27 "failureWaitingTime": 60
28 }
29 },
30 "dependsOn": []
31 },
32 {
33 "id": "tid-ray",
34 "name": "RayTask",
35 "description": "Ray 任务示例",
36 "clusterList": [
37 {
38 "clusterId": "compute_xxxxx",
39 "engineType": "RAY",
40 "clusterType": "RESIDENT"
41 }
42 ],
43 "rayTask": {
44 "taskParam": {
45 "codePath": "ray/code/path",
46 "entryPoint": "main.py",
47 "runtimeEnv": [
48 {
49 "key": "ENV1",
50 "value": "value1"
51 }
52 ]
53 }
54 },
55 "dependsOn": ["tid-dependent"]
56 }
57 ]
58}
提交包含变量的工作流(以算子任务为例)
工作流支持在工作流描述中使用形如${varName}的占位符的方式类嵌入变量,变量通过工作流的globalParams参数来维护。并且使用变量时有如下约束
- 工作流的
globalParams维护的参数的变量的key需要唯一 - 创建工作流时,
globalParams中需要包含工作流描述中所有的变量 - 在Volume路径中增加变量时,需要满足volume路径的格式,即必须以/Volumes开头,且至少包含4级路径
下面的示例中,分别在算子TestOp1和算子TestOp2中做了变量替换。其中,前者使用变量来作为参数,后者将变量作为参数的一部分
1POST https://databuilder.bd.baidubce.com/v2/workspaces/workspace_9862_0763f283d58a/job
2X-Region: bd
3X-Via: api-gateway
4Authorization: xxxxxx
5Host: host
6X-Bce-Request-Id: 75f4d4ad-d478-4494-95cd-74449a33365a
7Accept: */*
8Accept-Encoding: gzip, deflate, br
9Connection: keep-alive
10
11{
12 "name": "job_example",
13 "description": "示例作业描述",
14 "globalParams": [
15 {
16 "key": "key1"
17 "value": "value1"
18 },
19 {
20 "key": "key2"
21 "value": "value2"
22 }
23 ]
24 "tasks": [
25 {
26 "id": "tid-compound",
27 "name": "CompoundTask",
28 "description": "Compound 任务示例",
29 "clusterList": [
30 {
31 "clusterId": "compute_xxxxx",
32 "engineType": "RAY",
33 "clusterType": "RESIDENT"
34 }
35 ],
36 "compoundTask": {
37 "taskParam": {
38 "parallel": 5
39 },
40 "operators": [
41 {
42 "id": "opid-0",
43 "name": "TestOp1",
44 "params": [
45 {
46 "key": "paramKey1",
47 "value": "${key1}"
48 }
49 ],
50 "metaData": {
51 "catalogName": "catalog1",
52 "schemaName": "schema1",
53 "operatorName": "operator1",
54 "version": "v1"
55 },
56 "dependsOn": []
57 },
58 {
59 "id": "opid-1",
60 "name": "TestOp2",
61 "params": [
62 {
63 "key": "paramKey2",
64 "value": "param${key2}"
65 }
66 ],
67 "metaData": {
68 "catalogName": "catalog2",
69 "schemaName": "schema2",
70 "operatorName": "operator2",
71 "version": "v1"
72 },
73 "dependsOn": ["opid-0"]
74 }
75 ]
76 },
77 "dependsOn": [
78
79 ]
80 }
81 ]
82}
响应示例
1HTTP/1.1 200 OK
2Content-Type: application/json
3Transfer-Encoding: chunked
4Date: Mon, 28 Jul 2025 05:36:55 GMT
5Keep-Alive: timeout=180
6Connection: keep-alive
7{
8 "requestId": "cb17c0b8-3fc4-4015-8604-2dfc3f0773a7",
9 "code": "SUCCESS",
10 "result": "jid_a63b52e1fba4ab08"
11}
