循环(foreach)节点
概述
foreach 节点用于遍历数据集,给数据集中的每个元素都执行一次定义好的子流程,类似编程语言中的 for
循环。数据集是一个 JSON 数组对象。
foreach 与其它节点一样,开始执行时仍会先根据 stateDataFilter
过滤输入数据,接下来会根据 inputCollection
参数,从过滤后的输入数据中取出或生成一个数据集。接下来就是从数据集中依次取出元素,结合 iterationParam
参数传给子流程并执行,元素间的执行是并行的。
在所有元素都执行结束后,foreach 把执行结果汇总到一个数组,根据 outputCollection
参数节点数据内并输出。若某个元素的执行失败了,则整个 foreach 节点也会失败。
处理流程可用如下伪码表示:
# stateInput: 节点输入数据
# stateOutput: 节点输入数据
stateInput = stateDataFilter(stateInput) # 过滤输入
dataSetInput = inputCollection(stateInput) # 生成数据集
dataSetOutput = []
for item in dataSetInput:
iteratorInput = {"value of iterationParam": item} # 生成传给子流程的数据
iteratorOutput = iterator(iteratorInput) # 用子流程执行每个元素(并行)
dataSetOutput.append(iteratorOutput) # 汇总执行结果
if outputCollection is not None: # 如果定义了 outputCollection 参数,则把执行结果添加到节点输出数据
stateOutput["value of outputCollection"] = dataSetOutput
stateOutput = stateDataFilter(stateOutput) # 过滤输入
参数
以下为 foreach 节点所包含的参数字段:
字段 | 类型 | 描述 |
---|---|---|
type(必需) | string | 节点类型,值为 "foreach" |
name(必需) | string | 节点名称 |
iterator(必需) | object | 子流程定义 |
inputCollection(必需) | string | jq 表达式,用来指定或生成数据集 |
iterationParam(必需) | string | 引用数据集中每个元素的变量名 |
outputCollection(可选) | string | jq 表达式,用于指定把遍历数据集的返回结果写入节点数据内的某个字段,若不设置则表示忽略返回结果 |
max(可选) | integer | 并行执行元素的最大并发数,默认为100 |
next(二选一) | string | 当前节点运行结束后,下一个要运行的节点 |
end(二选一) | bool | 设定 end 参数为 true,表示该节点运行结束后,没有继续要运行的节点。只可以定义 next 或 end 中的一个 |
description(可选) | string | 节点描述信息 |
stateDataFilter(可选) | object | 节点输入输出过滤,参考输入与输出 |
retry(可选) | object | 错误重试定义,参考错误重试 |
catch(可选) | object | 错误捕获定义,参考错误捕获 |
iterator
的定义与 parallel 节点中的 branch
类似,如下所示:
字段 | 类型 | 描述 |
---|---|---|
start(必需) | string | 分支的开始节点 |
states(必需) | array of state | 该分支的节点数组 |
示例
示例工作流定义如下,该 foreach 节点的逻辑是,从节点输入数据的 orders 数组中筛选出符合 ".completed == true" 条件的 order,每个 order 传给 iterator 中的函数去执行,最后把数据结果写入节点输出数据的 "confirmationresults" 字段。
name: demo
start: processOrder
states:
- name: processOrder
type: foreach
inputCollection: "[.orders[] | select(.completed == true)]"
iterationParam: completedorder
outputCollection: ".confirmationresults"
iterator:
start: sendConfirmation
states:
- type: operation
name: sendConfirmation
resource: "brn:bce:cfc:bj:123456:function:test:$LATEST"
end: true
假定工作流的输入是:
{
"orders": [
{
"orderNumber": "1234",
"completed": true,
"email": "firstBuyer@buyer.com"
},
{
"orderNumber": "5678",
"completed": true,
"email": "secondBuyer@buyer.com"
},
{
"orderNumber": "9910",
"completed": false,
"email": "thirdBuyer@buyer.com"
}
]
}
节点开始执行后,通过 inputCollection
的 jq 表达式,筛选出 completed 为 true 的元素,得到数据集:
[
{
"orderNumber": "1234",
"completed": true,
"email": "firstBuyer@buyer.com"
},
{
"orderNumber": "5678",
"completed": true,
"email": "secondBuyer@buyer.com"
}
]
接下来对数据集中的每个元素,根据 iterationParam 参数,在本例中其值为 "completedorder",构造出传给子流程的数据。
构造出的第一个数据如下所示,然后调用子流程,假如子流程返回结果 {"1234": "ok"}
。
{
"completedorder": {
"orderNumber": "1234",
"completed": true,
"email": "firstBuyer@buyer.com"
}
}
构造出的第二个数据如下,然后调用子流程,假如子流程返回结果 {"5678": "ok"}
。
{
"completedorder": {
"orderNumber": "5678",
"completed": true,
"email": "secondBuyer@buyer.com"
}
}
两次子流程结束后,汇总后得到的结果集为 [{"1234": "ok"}, {"5678": "ok"}]
。
本例中定义了 outputCollection: ".confirmationresults"
,说明需要把结果集放到节点输出数据里的 "confirmationresults" 字段里,该字段不存在时会被自动创建,最终节点输出数据为:
{
"orders": [
{
"orderNumber": "1234",
"completed": true,
"email": "firstBuyer@buyer.com"
},
{
"orderNumber": "5678",
"completed": true,
"email": "secondBuyer@buyer.com"
},
{
"orderNumber": "9910",
"completed": false,
"email": "thirdBuyer@buyer.com"
}
],
"confirmationresults": [{"1234": "ok"}, {"5678": "ok"}]
}
注:为什么需要设置 iterationParam:数据集中的元素可能是一个普通的字符串、数字等非 JSON 对象,为了保证传给子流程的数据为 JSON 对象,所以通过 iterationParam 参数进行构造。