CCE Paddleflow Pipeline说明
组件介绍
CCE Paddleflow Pipeline组件是基于云原生架构的AI工作流组件,可实现编排AI作业,模板化AI作业的训练流程,提升实验效率。
组件功能
- 制定工作流定义规范。通过yaml规范,支持以有向无环图(DAG)的形式定义多个节点间的运行参数,以及运行关系。
- pipeline yaml定义规范:可以参考yaml定义规范
- 支持工作流,工作流任务管理。通过命令行(CLI),python SDK等形式,支持工作流的增删查改,以及工作流任务的管理,实现工作流定义的复用。
使用场景
使用工作流引擎实现在AI开发过程中端到端的流程编排与调度。
限制说明
- 仅支持 v1.18 和 v1.20 版本的 Kubernetes 集群。
安装组件
- 登录百度智能云官网,并进入管理控制台。
- 选择“产品服务 > 云原生 > 容器引擎 CCE”,单击进入容器引擎管理控制台。
- 单击左侧导航栏中的 集群管理 > 集群列表 。
- 在集群列表页面中,单击目标集群名称进入集群管理页面。
- 在集群管理页面单击 组件管理 。
- 在组件管理列表中选择 CCE Paddleflow Pipeline组件单击“安装”。
- 在组件配置页面中完成相关配置:
-
数据库配置
- 若您需要将Paddleflow工作流用于生产环境,推荐您使用百度云RDS实例,需要您输入RDS实例的地址、数据库名称、用户名、密码信息。
- 若您仅用于体验Paddleflow工作流功能,您也可选择集群内置MySQL,系统将自动为您安装MySQL至集群中,但若您误删数据库可能存在数据丢失风险
-
网络配置
- 若您需要公网访问,推荐创建Loadbalancer或Nodeport类型service;若您选择Loadbalancer,可新建BLB或者绑定已有BLB,新建BLB后端会默认创建一个BLB实例,绑定已有BLB会清空现有BLB规则。
- 若您仅需内网访问,可创建CLuster IP类型Service,仅需输入服务端口即可
- 若您需要更多自定义service选项,可参考通过CCE使用K8S_Service
- 点击“安装”按钮完成组件的安装。
组件使用
组件安装完成后,您需要在您的开发机上安装PaddleFlow客户端以使用PaddleFlow工作流完整功能,客户端安装步骤可参考PaddleFlow部署
客户端安装完成后,您需要完成配置文件的配置,即可开始试用PaddleFlow。
配置文件说明
[user]
name = 账户名
password = 账户密码
[server]
paddleflow_server = 127.0.0.1 // paddleflow server 地址
paddleflow_port = 8080 // paddleflow server 端口
定义pipeline
-
在发起工作流任务之前,您需要定义Pipeline。 PaddleFlow Pipeline提供两种定义pipeline方式供您选择:
- 接下来我们将简单依次这两种定义方式
yaml定义规范
下面是一个典型的,最基础的yaml格式pipeline定义。
- 该示例中pipeline定义,以及示例相关运行脚本,来自Paddleflow项目下example/pipeline/base_pipeline示例。
- 示例链接:base_pipeline
name: base_pipeline
docker_env: nginx:1.7.9
entry_points:
preprocess:
command: bash base_pipeline/shells/data.sh {{data_path}}
docker_env: centos:centos7
env:
USER_ABC: 123_{{PF_USER_NAME}}
parameters:
data_path: ./base_pipeline/data/{{PF_RUN_ID}}
train:
command: bash base_pipeline/shells/train.sh {{epoch}} {{train_data}} {{model_path}}
deps: preprocess
parameters:
epoch: 5
model_path: ./output/{{PF_RUN_ID}}
train_data: '{{preprocess.data_path}}'
validate:
command: bash base_pipeline/shells/validate.sh {{model_path}}
deps: train
parameters:
model_path: '{{train.model_path}}'
parallelism: 1
各字段解析
接下来将上述pipeline定义,介绍每个字段的作用
全局字段
在上面的pipeline定义中,在全局级别共有 name,docker_env, entry_points, parallelism 四个字段,这个四个字段的相关说明如下表所示:
字段名 | 含义 | 类型 | 是否必须 | 默认值 | 备注 |
name | pipeline的名字 | str | 是 | 只能由字母数字下划线组成,且以字母或下划线开头
正则表达式: ^[A-Za-z_][A-Za-z0-9_]{0,49}$ |
|
docker_env | docker 镜像地址 | str | 否 | 优先级:节点级别的docker_env参数 > 全局级别的docker_env参数。 | |
entry_points | pipeline 结构 | dict | 是 | 由各个节点组成的有向无环图(DAG)结构 | |
parallelism | 单次pipeline 任务,最大可以同时运行的节点数 | int | 否 | 10 | 默认是10,目前最大不能超过20。
同一个pipeline,发起的不同run,parallelism相互独立。
实际节点运行并发度,也可能会受底层资源影响 |
节点字段
正如前文所述,pipeline是由各个节点组成的有向无环图(DAG)结构,由entry_points字段来指定。 entry_points字段需要是一个dict,其中每一项都将被视为一个节点:
-
其key将会被视为节点的名字,需要满足如下约束:
- 只能由字母,数字,中划线组成,且以字母开头
- 对应正则表达式: ^[a-zA-Z] [a-zA-Z0-9-]{0,29}$
-
其value即为节点的详细定义,其各子字段的含义如下:
字段名 含义 类型 是否必须 备注 docker_env docker镜像地址 string 否 优先级:节点级别的docker_env参数 > 全局级别的docker_env参数。 parameters 节点运行参数 dict 否 key即为 parameter的名字,需要满足如下的约束:
* 只能由字母数字下划线组成,且以字母或下划线开头
* ^[A-Za-z_][A-Za-z0-9_]{0,49}$
value即为parameter的值,在yaml中定义的值将视为其默认值,发起任务时,可以通过接口参数进行更改env 节点运行时的环境变量 dict 否 key为环境变量的名字,需满足如下约束:
* 只能由字母数字下划线组成,且以字母下划线开头
* ^[A-Za-z_][A-Za-z0-9_]{0,49}$
value即为环境变量的值command 节点需要执行的命令 str 是 deps 表示当前节点所以依赖的上游的节点 str 否 若有多个上游节点,上游节点名通过逗号分隔
变量模板与替换
变量模板
在节点定义中,parameters,command,env字段除了可以定义成字符串常量,还可以通过变量模板,引用其他变量。 目前支持的变量模板有三类:
-
系统变量:Paddleflow提供的,系统变量的引用,引用模板包括:
- {{PF_RUN_ID}}:表示当前pipeline run的ID,格式为:run-XXXXXX。例如:run-000032
- {{PF_STEP_NAME}}:当前运行的节点名称
-
{{PF_USER_NAME}}:当前运行的用户
上面的示例中:preprocess节点,env中的USER_ABC变量,引用了{{PF_USER_NAME}}变量
-
同节点其他参数:用于引用同一节点内的其他参数。
上面的示例中: preprocess节点的command变量,引用了同节点parameters中的data_path变量
-
上游节点参数:用于引用上游节点内的参数,格式为{{stepName.paramName}}。
上面的示例中: train节点parameters中,train_data参数使用 {{preprocess.data_path}} 模板,引用了 preprocess 节点的 data_path 参数
变量模板替换流程 下面按照每个节点运行前的变量模板替换顺序,来介绍每个变量支持的模板:
-
parameters 支持以下模板:
- 系统变量
- 上游节点parameters
-
env 支持以下模板:
- 系统变量
- 本step内 parameters
-
command 支持以下模板:
- 系统变量
- 本step内 parameters
DSL定义规范
下面是使用DSL规范定义的pipeline,该pipeline与前文使用yaml规范定义的pipeline在运行时是完全等价的。
- 该示例中pipeline定义,以及示例相关运行脚本,来自Paddleflow项目下example/pipeline/base_pipeline示例。
- 示例链接:base_pipeline
from paddleflow.pipeline import ContainerStep
from paddleflow.pipeline import Pipeline
from paddleflow.pipeline import Parameter
from paddleflow.pipeline import PF_RUN_ID
def preprocess():
""" data preprocess step
"""
step = ContainerStep(
name="preprocess",
docker_env="centos:centos7",
parameters={"data_path": f"./base_pipeline/data/{PF_RUN_ID}"},
env={"USER_ABC": "123_{{PF_USER_NAME}}"},
command="bash base_pipeline/shells/data.sh {{data_path}}"
)
return step
def train(epoch, train_data):
""" train step
"""
step = ContainerStep(
name="train",
command="bash base_pipeline/shells/train.sh {{epoch}} {{train_data}} {{model_path}}",
parameters={
"epoch": epoch,
"model_path": f"./output/{PF_RUN_ID}",
"train_data": train_data
}
)
return step
def validate(model_path):
""" validate step
"""
step = ContainerStep(
name="validate",
command="bash base_pipeline/shells/validate.sh {{model_path}}",
parameters={"model_path": model_path}
)
return step
@Pipeline(name="base_pipeline", docker_env="nginx:1.7.9", parallelism=1)
def base_pipeline(epoch=5):
""" base pipeline
"""
pre_step = preprocess()
train_step = train(epoch, pre_step.parameters["data_path"])
validate_step = validate(train_step.parameters["model_path"])
导入DSL相关模块
与编写任何Python脚本一样,我们首先要导入将会使用到的模块、类、或者函数等。Python DSL提供的模块、类、函数等都可以通过paddleflow.pipeline模块完成导入,如上面的示例所示:
from paddleflow.pipeline import ContainerStep
from paddleflow.pipeline import Pipeline
from paddleflow.pipeline import Parameter
from paddleflow.pipeline import PF_RUN_ID
定义节点
在DSL中,节点通过实例化ContainerStep来实现,您需要定义多少个节点,便需要实例化多少个ContainerStep实例。如上面示例中,函数preprocess, train, validate均会返回一个ContainerStep的实例。 ContainerStep实例化函数的主要参数说明如下:
字段名称 | 字段类型 | 字段含义 | 是否必须 | 备注 |
name | string | 节点的名字 | 是 | 需要满足如下约束: * 只能由字母,数字,中划线组成,且以字母开头 *对应正则表达式: ^[a-zA-Z][a-zA-Z0-9-]{0,29}$ |
command | string | Step需要执行的任务 | 否 | |
docker_env | string | docker镜像地址 | 否 | 优先级:节点级别的docker_env参数 > 全局级别的docker_env参数。 |
parameters | dict[str, Union[int, string, float, Parameter]] | Step运行参数,在创建任务之前便需要确定其参数值 | 否 | key即为 parameter的名字,需要满足如下的约束: * 只能由字母数字下划线组成,且以字母或下划线开头 * ^[A-Za-z_][A-Za-z0-9_]{0,49}$ value即为parameter的值,在yaml中定义的值将视为其默认值,发起任务时,可以通过接口参数进行更改 |
env | dict[str, str] | 节点运行任务时的环境变量 | 否 | key为环境变量的名字,需满足如下约束: * 只能由字母数字下划线组成,且以字母或下划线开头 * ^[A-Za-z_][A-Za-z0-9_]{0,49}$ value即为环境变量的值 |
- 与ContainerStep相关的更多说明,可以点击这里查看
定义pipeline
在完成了所有的节点定义后,便可以开始将这些Step有机的组装成一个pipeline。将ContainerStep组装成pipeline,可以分成以下三步:
- 实例化Pipeline对象
- 将ContainerStep实例添加至Pipeline实例中
- 指定ContainerStep实例间的依赖关系
接下来,我们将依次介绍这三个步骤。
实例化Pipeline对象
在将ContainerStep实例添加至Pipeline实例前,我们需要先实例化相关的Pipeline对象。这里需要特别注意的是,Pipeline是一个类装饰器,需要将其作为一个函数的装饰器去进行实例化,如上面的示例所示:
@Pipeline(name="base_pipeline", docker_env="nginx:1.7.9", parallelism=1)
def base_pipeline(epoch=5):
""" base pipeline
"""
... ...
Pipeline实例化函数的主要参数说明如下:
字段名 | 含义 | 类型 | 是否必须 | 默认值 | 备注 |
name | pipeline的名字 | str | 是 | 只能由字母数字下划线组成,且以字母或下划线开头
* 正则表达式: ^[A-Za-z_][A-Za-z0-9_]{0,49}$ |
|
docker_env | docker 镜像地址 | str | 否 | 优先级:节点级别的docker_env参数 > 全局级别的docker_env参数。 | |
parallelism | 单次pipeline 任务,最大可以同时运行的节点数 | int | 否 | 10 | 默认是10,目前最大不能超过20。
同一个pipeline,发起的不同run,parallelism相互独立。
实际节点运行并发度,也可能会受底层资源影响 |
- 关于Pipeline实例化函数的详细说明,请点击这里。
将ContainerStep实例添加至Pipeline实例中
在完成了Pipeline对象的实例化后, 接下来便需要将ContainerStep实例添加至Pipeline实例中,添加方式很简单:
-
只需在pipeline函数中完成Step的实例化即可。
- pipeline函数: 指被Pipeline实例装饰的函数。
如在上面的示例所示, 在pipeline函数 base_pipeline()中,依次调用了 preprocess(), train(), validate() 三个函数,而在这三个函数中,均完成了一个 ContainerStep对象的实例化。因此,此时的Pipeline实例将会包含有三个ContainerStep实例,其名字依次为:"preprocess"、"train"、"validate"。
指定ContainerStep实例间的依赖关系
在Paddleflow Pipeline中,ContainerStep实例之间可以存在如下的依赖关系:
- 流程依赖: 如果StepA需要在StepB之后运行,则称StepA在流程上依赖于StepB。
- Parameter依赖: 如果StepA的某个Parameter引用了StepB的某个Parameter,则称StepA在Parameter上依赖于StepB。
定义流程依赖
- 流程依赖的定义方式非常简单,只需要调用ContainerStep实例的after()方法即可, 如下所示:
stepA.after(stepB)
通过该语句,便定义了stepA与stepB之间的流程依赖关系:stepA在流程上依赖于stepB。
Parameter依赖
在某些情况下,StepA 的某个Parameter["P1"]需要使用StepB的Parameter["P2"]的值,此时我们便可以定义Parameter参数依赖,定义方式也很简单,直接给StepA的参数Parameter["P1"]赋值为StepB的Parameter["P2"]的引用即可,示例代码如下:
StepA.parameters["P1"] = StepB.parameters["P2"]
在运行StepA时,Parameter["P1"]的值将会被替换为StepB的Parameter["P2"]的值
一些细心的用户应该早已发现,在上面的pipeline示例中,存在有如下的参数依赖:
- train_step的Parameter["train_data"] 依赖于preprocess_step的Parameter["data_path"]
- validate_step的Parameter["model_path"]依赖于train_step的Parameter["model_path"]
运行任务
当完成pipeline定义后,您便可以使用定义好的pipeline来发起pipeline任务。
为了能正常发起Pipeline任务,您需要提前做好如下几个步骤:
- 创建fs,详情请参考PaddleFlow存储介绍
- 将任务运行所需文件存放到fs的正确路径下
在本小节中,将会假定您已经创建了一个名为ppl的PaddleFlow存储, 并且已经将base_pipeline下的所有文件存放到了该存储的base_pipeline路径下
运行yaml规范定义的Pipeline
当您使用yaml规范定义好了pipeline,并把其存放在PaddleFlow存储之后,您只需要执行如下的命令便可以创建一个pipeline任务:
paddleflow run create -f ppl -yp base_pipeline/run.yaml
运行该命令后, 你会看到如下的信息:
此时,便表明您已经成功的创建了一个pipeline任务,其id为run-000291
关于paddleflow命令行的更详细的说明,请查看命令行使用规范
运行DSL规范定义的Pipeline
当您使用DSL规范定义好您的pipeline后,如果您需要使用该pipeline创建pipeline任务,您只需在python脚本中,调用Pipeline实例的run()函数即可。如下所示:
if __name__ == "__main__":
ppl = base_pipeline()
print(ppl.run(fs_name="ppl"))
然后您通过python执行该脚本,即可发起pipeline 任务, 如下图所示:
更多
至此,您应该应该已经学会了PaddleFlow Pipeline 最基础的使用,如果您想跟深入了解,您可以点击下方链接: