baetyl-rule
更新时间:2023-01-03
简介
baetyl-rule 可以实现 baetyl 框架端侧的消息流转,在 baetyl-broker (端侧消息中心)、函数服务、Iot Hub (云端 mqtt broker),http服务进行消息交换。
支持以下的消息流转方式:
- 订阅来自mqtt的消息。
- 调用函数计算(此步骤可以省略)
- 将处理后的结果发送至mqtt或调用http服务。
其中 baetyl 支持 Python、Node、SQL 等多种运行时,可以配置相关的脚本函数对消息进行过滤、处理、转换以及丰富等。
配置
baetyl-rule 的全量配置文件如下,并对配置字段做了相应解释:
clients: # 消息节点,可以从消息节点订阅消息,也可以发送至消息节点
- name: iothub # 名称
kind: mqtt # mqtt 类型
address: 'ssl://u7isgiz.mqtt.iot.bj.baidubce.com:1884' # 地址
username: client.example.org # 用户名
ca: ../example/var/lib/baetyl/testcert/ca.pem # 连接节点的 CA
key: ../example/var/lib/baetyl/testcert/client.key # 连接节点的私钥
cert: ../example/var/lib/baetyl/testcert/client.pem # 连接节点的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
- name: http-client # 名称
kind: http # http类型(仅可配置为target)
address: 'http://127.0.0.1:8554' # http服务地址,如果是https,请配置证书,否则默认使用系统证书
ca: ../example/var/lib/baetyl/testcert/ca.pem # 连接节点的 CA
key: ../example/var/lib/baetyl/testcert/client.key # 连接节点的私钥
cert: ../example/var/lib/baetyl/testcert/client.pem # 连接节点的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
- name: http-server # 名称
kind: http-server # http-server类型(仅可配置为source)
port: 8090 # http server服务端口,下面的tls可选,不配置时,默认为http服务
ca: ../example/var/lib/baetyl/testcert/ca.pem # 服务器的 CA
key: ../example/var/lib/baetyl/testcert/client.key # 服务器的私钥
cert: ../example/var/lib/baetyl/testcert/client.pem # 服务器的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
rules: # 消息规则
- name: rule1 # 规则名称,必须保持唯一
source: # 消息源
topic: broker/topic1 # 消息主题
qos: 1 # 消息质量
target: # 消息目的地
client: iothub # 消息节点,如果不设置,默认为 baetyl-broker
topic: iothub/topic2 # 消息主题
qos: 0 # 消息质量
function: # 处理函数
name: node85 # 函数名称
- name: rule2 # 规则名称,必须保持唯一
source: # 消息源
topic: broker/topic5 # 消息主题
qos: 0 # 消息质量
target: # 消息目的地
client: http-client # 与clients中配置的http服务名称一致
path: /nodes/test # http访问路径
method: PUT # http消息类型,支持GET/POST/PUT/DELETE
- name: rule3 # 规则名称,必须保持唯一
source: # 消息源
client: http-server # 指定为http-server的消息源
target: # 消息目的地
client: http-client # 与clients中配置的http服务名称一致
path: /nodes/test # http访问路径
method: PUT # http消息类型,支持GET/POST/PUT/DELETE
说明:
- baetyl-rule 后台默认添加边缘系统应用baetyl-broker作为一个消息节点
- 当一条rule规则的
source/target
未配置client
字段时,会默认使用 baetyl-broker 作为其消息节点- 当一个client消息节点的kind为
http
时,若address
连接地址使用https,默认使用baetyl-core签发的系统证书- http类型消息节点只能作为一条rule的target,并且http请求的Content-Type为
application/json
- http-server类型仅可作为rule的source,且该类型仅可存在一个配置,用户调用时,使用POST请求访问地址
http://{ip}:{port}/rules/{ruleName}
来触发调用
Demo示例
纯消息流转
下面示例定义了两条规则
- edge2iotcore:将边缘侧edgetopic1的消息,转发至云端iotcore,mqtt小的qos=1
- edge2edge:将边缘侧edgetopic1的消息,转发至边缘侧edgetopic2,mqtt消息的qos=1
clients:
- name: iotcore
kind: mqtt
address: 'tcp://aeccdhl.iot.gz.baidubce.com:1883'
username: xxxcdhl/device6
password: lfWjOxxxmvXPpM
rules:
- name: edge2iotcore
source:
topic: edgetopic1
qos: 1
target:
client: iotcore
topic: topic1
qos: 1
- name: edge2edge
source:
topic: edgetopic1
qos: 1
target:
topic: edgetopic2
qos: 1
logger:
level: debug
encoding: console
消息流转+函数计算
下面示例定义了3条规则
- rule1:订阅
broker/topic1
消息,将消息作为函数py-demo1/func1
的输入,将函数计算结果输出至broker/topic2
- rule2:订阅
broker/topic3
消息,将消息作为函数py-demo1/func2
的输入,将函数计算结果通过http POST请求发送至http://10.68.23.42:8554/rule/result
clients:
- name: http-server
kind: http
address: 'http://10.68.23.42:8554'
rules:
- name: rule1
source:
topic: broker/topic1
target:
topic: broker/topic2
function:
name: py-demo1/func1
- name: rule2
source:
topic: broker/topic3
target:
client: http-server
path: /rule/result
method: POST
function:
name: py-demo1/func2
logger:
level: debug
encoding: console
更新节点影子
节点影子相关说明可以参考该章节节点影子。 但是由于节点影子的更新仅提供了https接口,因此,可以通过baetyl-rule来路由mqtt消息,从而达成通过mqtt消息更新节点影子的需求。
在云端设置节点影子:
baetyl-rule参考配置如下:
clients:
- name: core
kind: http
address: 'https://baetyl-core.baetyl-edge-system'
rules:
- name: shadow
source:
topic: shadow/put
qos: 0
target:
client: core
path: /node/properties
method: PUT
logger:
level: debug
encoding: console
使用mqtt box连接到mqtt broker后,向定义的topic shadow/put
中发送需要修改的值。
回到云端,可以看到,该节点影子值已被修改。