baetyl-rule
更新时间:2023-05-22
简介
baetyl-rule 可以实现 baetyl 框架端侧的消息流转,在 baetyl-broker (端侧消息中心)、函数服务、Iot Hub (云端 mqtt broker),http服务,消息队列kafka,rabbit mq,对象存储进行消息交换。
支持以下的消息流转方式:
消息源:
- 订阅来自MQTT的消息
- 接收来自HTTP的请求
消息处理:
- 调用函数计算(此步骤可以省略)
消息目的地:
- 将处理后的结果发送至mqtt
- 发送至http服务
- 发送至消息队列kafka
- 发送至rabbit mq
- 解析消息内容,发送本地文件至s3对象存储
其中 baetyl 支持 Python、Node、SQL 等多种运行时,可以配置相关的脚本函数对消息进行过滤、处理、转换以及丰富等。
配置
baetyl-rule 的全量配置文件如下,并对配置字段做了相应解释:
- mqtt配置
YAML
1clients: # 消息节点,可以从消息节点订阅消息,也可以发送至消息节点
2 - name: iothub # 名称
3 kind: mqtt # mqtt 类型
4 address: 'ssl://u7isgiz.mqtt.iot.bj.baidubce.com:1884' # 地址
5 username: test # 用户名 (可选)
6 password: test # 密码 (可选)
7 ca: /var/lib/baetyl/testcert/ca.pem # 连接节点的 CA(可选)
8 key: /var/lib/baetyl/testcert/client.key # 连接节点的私钥(可选)
9 cert: /var/lib/baetyl/testcert/client.pem # 连接节点的公钥(可选)
10 insecureSkipVerify: true # 是否跳过服务端证书校验
11rules: # 消息规则
12 - name: rule1 # 规则名称,必须保持唯一
13 source: # 消息源
14 topic: broker/topic1 # 消息主题
15 qos: 1 # 消息质量
16 target: # 消息目的地
17 client: iothub # 消息节点,如果不设置,默认为 baetyl-broker
18 topic: iothub/topic2 # 消息主题
19 qos: 0 # 消息质量
20 function: # 处理函数
21 name: node85 # 函数名称
- http配置
YAML
1clients:
2 - name: http-server # 名称
3 kind: http-server # http-server类型(仅可配置为source)
4 port: 8090 # http server服务端口,下面的tls可选,不配置时,默认为http服务
5 ca: /var/lib/baetyl/testcert/ca.pem # 服务器的 CA
6 key: /var/lib/baetyl/testcert/client.key # 服务器的私钥
7 cert: /var/lib/baetyl/testcert/client.pem # 服务器的公钥
8 insecureSkipVerify: true # 是否跳过服务端证书校验
9 - name: http-client # 名称
10 kind: http # http类型(仅可配置为target)
11 address: 'http://127.0.0.1:8554' # http服务地址,如果是https,请配置证书,否则默认使用系统证书
12 ca: /var/lib/baetyl/testcert/ca.pem # 连接节点的 CA
13 key: /var/lib/baetyl/testcert/client.key # 连接节点的私钥
14 cert: /var/lib/baetyl/testcert/client.pem # 连接节点的公钥
15 insecureSkipVerify: true # 是否跳过服务端证书校验
16rules:
17 - name: rule2 # 规则名称,必须保持唯一
18 source: # 消息源
19 client: http-server # 指定为http-server的消息源
20 target: # 消息目的地
21 client: http-client # 与clients中配置的http服务名称一致
22 path: /nodes/test # http访问路径
23 method: PUT # http消息类型,支持GET/POST/PUT/DELETE
- s3对象存储配置
YAML
1clients:
2 - name: minio # s3 client名称
3 kind: s3 # 当前,仅支持作为target
4 address: '127.0.0.1:8686' # s3服务地址
5 bucket: 'bie-upload' # s3指定bucket
6 ak: 'test' # 连接的ak
7 sk: 'test' # 连接的sk
8 token: 'test' # 连接的token(可选)
9 region: us-east-1 # 区域(可选)
10rules:
11 - name: rule3
12 source:
13 topic: shadow/put
14 qos: 0
15 target:
16 client: minio # 与clients中配置的s3服务名称一致
s3与消息队列的配置实例中,source都使用了baetyl-broker进行展示。
在BIE私有化中,提供sts鉴权,同时,你可以直接在target中指定为baetyl-sts
,无需自行设置s3连接信息,文件会自动上传到namespace/nodename/
路径下。
使用s3上传文件,需要保证请求mqtt或者http发送的消息内容为application/json,内容格式如下:
JSON
1{
2 "type":"UPLOAD", // 定值,表示进行s3上传
3 "content":{
4 "localPath":"/Users/hanpengfei01/Desktop/ingress.txt", // 需要上传文件的本地文件地址,如果是容器模式,请确保该文件目录已挂载
5 "remotePath":"ingress.txt" // 云端的上传文件地址
6 }
7}
- kafka, rabbit mq消息队列配置
YAML
1clients:
2 - name: mq-target # rabbit-mq client名称
3 kind: rabbit-mq # 当前,仅支持作为target
4 address: '127.0.0.1:5672' # amqp服务地址
5 username: test # 用户名
6 password: test # 密码
7 - name: kafka-target # kafka client名称
8 kind: kafka # 当前,仅能作为target使用
9 address: ['127.0.0.1:9092'] # broker地址数组
10 saslType: plain # sasl鉴权类型,当前支持plain,scram256,scram512,置空则不开启鉴权(可选)
11 username: test # 用户名 (可选)
12 password: test # 密码 (可选)
13 ca: /var/lib/baetyl/testcert/ca.pem # 连接节点的 CA(可选)
14 key: /var/lib/baetyl/testcert/client.key # 连接节点的私钥(可选)
15 cert: /var/lib/baetyl/testcert/client.pem # 连接节点的公钥(可选)
16rules:
17 - name: rule4
18 source:
19 topic: shadow/put
20 qos: 0
21 target:
22 client: mq-target # rabbit-mq client名称
23 exchange: bie_events # exchange名称
24 routingKey: my_routing_key # routingKey名称
25 - name: rule5
26 source:
27 topic: shadow/put
28 qos: 0
29 target:
30 client: kafka-target # kafka client名称
31 topic: bie-test # kafka topic
说明:
- baetyl-rule 后台默认添加边缘系统应用baetyl-broker作为一个消息节点
- 当一条rule规则的
source/target
未配置client
字段时,会默认使用 baetyl-broker 作为其消息节点- 当一个client消息节点的kind为
http
时,若address
连接地址使用https,默认使用baetyl-core签发的系统证书- http类型消息节点只能作为一条rule的target,并且http请求的Content-Type为
application/json
- http-server类型仅可作为rule的source,且该类型仅可存在一个配置,用户调用时,使用POST请求访问地址
http://{ip}:{port}/rules/{ruleName}
来触发调用
Demo示例
纯消息流转
下面示例定义了两条规则
- edge2iotcore:将边缘侧edgetopic1的消息,转发至云端iotcore,mqtt小的qos=1
- edge2edge:将边缘侧edgetopic1的消息,转发至边缘侧edgetopic2,mqtt消息的qos=1
YAML
1clients:
2 - name: iotcore
3 kind: mqtt
4 address: 'tcp://aeccdhl.iot.gz.baidubce.com:1883'
5 username: xxxcdhl/device6
6 password: lfWjOxxxmvXPpM
7rules:
8 - name: edge2iotcore
9 source:
10 topic: edgetopic1
11 qos: 1
12 target:
13 client: iotcore
14 topic: topic1
15 qos: 1
16 - name: edge2edge
17 source:
18 topic: edgetopic1
19 qos: 1
20 target:
21 topic: edgetopic2
22 qos: 1
23
24logger:
25 level: debug
26 encoding: console
消息流转+函数计算
下面示例定义了3条规则
- rule1:订阅
broker/topic1
消息,将消息作为函数py-demo1/func1
的输入,将函数计算结果输出至broker/topic2
- rule2:订阅
broker/topic3
消息,将消息作为函数py-demo1/func2
的输入,将函数计算结果通过http POST请求发送至http://10.68.23.42:8554/rule/result
YAML
1clients:
2 - name: http-server
3 kind: http
4 address: 'http://10.68.23.42:8554'
5rules:
6 - name: rule1
7 source:
8 topic: broker/topic1
9 target:
10 topic: broker/topic2
11 function:
12 name: py-demo1/func1
13 - name: rule2
14 source:
15 topic: broker/topic3
16 target:
17 client: http-server
18 path: /rule/result
19 method: POST
20 function:
21 name: py-demo1/func2
22logger:
23 level: debug
24 encoding: console
更新节点影子
节点影子相关说明可以参考该章节节点影子。 但是由于节点影子的更新仅提供了https接口,因此,可以通过baetyl-rule来路由mqtt消息,从而达成通过mqtt消息更新节点影子的需求。
在云端设置节点影子:
baetyl-rule参考配置如下:
YAML
1clients:
2 - name: core
3 kind: http
4 address: 'https://baetyl-core.baetyl-edge-system'
5rules:
6 - name: shadow
7 source:
8 topic: shadow/put
9 qos: 0
10 target:
11 client: core
12 path: /node/properties
13 method: PUT
14logger:
15 level: debug
16 encoding: console
使用mqtt box连接到mqtt broker后,向定义的topic shadow/put
中发送需要修改的值。
回到云端,可以看到,该节点影子值已被修改。