baetyl-rule
更新时间:2023-05-22
简介
baetyl-rule 可以实现 baetyl 框架端侧的消息流转,在 baetyl-broker (端侧消息中心)、函数服务、Iot Hub (云端 mqtt broker),http服务,消息队列kafka,rabbit mq,对象存储进行消息交换。
支持以下的消息流转方式:
消息源:
- 订阅来自MQTT的消息
- 接收来自HTTP的请求
消息处理:
- 调用函数计算(此步骤可以省略)
消息目的地:
- 将处理后的结果发送至mqtt
- 发送至http服务
- 发送至消息队列kafka
- 发送至rabbit mq
- 解析消息内容,发送本地文件至s3对象存储
其中 baetyl 支持 Python、Node、SQL 等多种运行时,可以配置相关的脚本函数对消息进行过滤、处理、转换以及丰富等。
配置
baetyl-rule 的全量配置文件如下,并对配置字段做了相应解释:
- mqtt配置
clients: # 消息节点,可以从消息节点订阅消息,也可以发送至消息节点
- name: iothub # 名称
kind: mqtt # mqtt 类型
address: 'ssl://u7isgiz.mqtt.iot.bj.baidubce.com:1884' # 地址
username: test # 用户名 (可选)
password: test # 密码 (可选)
ca: /var/lib/baetyl/testcert/ca.pem # 连接节点的 CA(可选)
key: /var/lib/baetyl/testcert/client.key # 连接节点的私钥(可选)
cert: /var/lib/baetyl/testcert/client.pem # 连接节点的公钥(可选)
insecureSkipVerify: true # 是否跳过服务端证书校验
rules: # 消息规则
- name: rule1 # 规则名称,必须保持唯一
source: # 消息源
topic: broker/topic1 # 消息主题
qos: 1 # 消息质量
target: # 消息目的地
client: iothub # 消息节点,如果不设置,默认为 baetyl-broker
topic: iothub/topic2 # 消息主题
qos: 0 # 消息质量
function: # 处理函数
name: node85 # 函数名称
- http配置
clients:
- name: http-server # 名称
kind: http-server # http-server类型(仅可配置为source)
port: 8090 # http server服务端口,下面的tls可选,不配置时,默认为http服务
ca: /var/lib/baetyl/testcert/ca.pem # 服务器的 CA
key: /var/lib/baetyl/testcert/client.key # 服务器的私钥
cert: /var/lib/baetyl/testcert/client.pem # 服务器的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
- name: http-client # 名称
kind: http # http类型(仅可配置为target)
address: 'http://127.0.0.1:8554' # http服务地址,如果是https,请配置证书,否则默认使用系统证书
ca: /var/lib/baetyl/testcert/ca.pem # 连接节点的 CA
key: /var/lib/baetyl/testcert/client.key # 连接节点的私钥
cert: /var/lib/baetyl/testcert/client.pem # 连接节点的公钥
insecureSkipVerify: true # 是否跳过服务端证书校验
rules:
- name: rule2 # 规则名称,必须保持唯一
source: # 消息源
client: http-server # 指定为http-server的消息源
target: # 消息目的地
client: http-client # 与clients中配置的http服务名称一致
path: /nodes/test # http访问路径
method: PUT # http消息类型,支持GET/POST/PUT/DELETE
- s3对象存储配置
clients:
- name: minio # s3 client名称
kind: s3 # 当前,仅支持作为target
address: '127.0.0.1:8686' # s3服务地址
bucket: 'bie-upload' # s3指定bucket
ak: 'test' # 连接的ak
sk: 'test' # 连接的sk
token: 'test' # 连接的token(可选)
region: us-east-1 # 区域(可选)
rules:
- name: rule3
source:
topic: shadow/put
qos: 0
target:
client: minio # 与clients中配置的s3服务名称一致
s3与消息队列的配置实例中,source都使用了baetyl-broker进行展示。
在BIE私有化中,提供sts鉴权,同时,你可以直接在target中指定为baetyl-sts
,无需自行设置s3连接信息,文件会自动上传到namespace/nodename/
路径下。
使用s3上传文件,需要保证请求mqtt或者http发送的消息内容为application/json,内容格式如下:
{
"type":"UPLOAD", // 定值,表示进行s3上传
"content":{
"localPath":"/Users/hanpengfei01/Desktop/ingress.txt", // 需要上传文件的本地文件地址,如果是容器模式,请确保该文件目录已挂载
"remotePath":"ingress.txt" // 云端的上传文件地址
}
}
- kafka, rabbit mq消息队列配置
clients:
- name: mq-target # rabbit-mq client名称
kind: rabbit-mq # 当前,仅支持作为target
address: '127.0.0.1:5672' # amqp服务地址
username: test # 用户名
password: test # 密码
- name: kafka-target # kafka client名称
kind: kafka # 当前,仅能作为target使用
address: ['127.0.0.1:9092'] # broker地址数组
saslType: plain # sasl鉴权类型,当前支持plain,scram256,scram512,置空则不开启鉴权(可选)
username: test # 用户名 (可选)
password: test # 密码 (可选)
ca: /var/lib/baetyl/testcert/ca.pem # 连接节点的 CA(可选)
key: /var/lib/baetyl/testcert/client.key # 连接节点的私钥(可选)
cert: /var/lib/baetyl/testcert/client.pem # 连接节点的公钥(可选)
rules:
- name: rule4
source:
topic: shadow/put
qos: 0
target:
client: mq-target # rabbit-mq client名称
exchange: bie_events # exchange名称
routingKey: my_routing_key # routingKey名称
- name: rule5
source:
topic: shadow/put
qos: 0
target:
client: kafka-target # kafka client名称
topic: bie-test # kafka topic
说明:
- baetyl-rule 后台默认添加边缘系统应用baetyl-broker作为一个消息节点
- 当一条rule规则的
source/target
未配置client
字段时,会默认使用 baetyl-broker 作为其消息节点- 当一个client消息节点的kind为
http
时,若address
连接地址使用https,默认使用baetyl-core签发的系统证书- http类型消息节点只能作为一条rule的target,并且http请求的Content-Type为
application/json
- http-server类型仅可作为rule的source,且该类型仅可存在一个配置,用户调用时,使用POST请求访问地址
http://{ip}:{port}/rules/{ruleName}
来触发调用
Demo示例
纯消息流转
下面示例定义了两条规则
- edge2iotcore:将边缘侧edgetopic1的消息,转发至云端iotcore,mqtt小的qos=1
- edge2edge:将边缘侧edgetopic1的消息,转发至边缘侧edgetopic2,mqtt消息的qos=1
clients:
- name: iotcore
kind: mqtt
address: 'tcp://aeccdhl.iot.gz.baidubce.com:1883'
username: xxxcdhl/device6
password: lfWjOxxxmvXPpM
rules:
- name: edge2iotcore
source:
topic: edgetopic1
qos: 1
target:
client: iotcore
topic: topic1
qos: 1
- name: edge2edge
source:
topic: edgetopic1
qos: 1
target:
topic: edgetopic2
qos: 1
logger:
level: debug
encoding: console
消息流转+函数计算
下面示例定义了3条规则
- rule1:订阅
broker/topic1
消息,将消息作为函数py-demo1/func1
的输入,将函数计算结果输出至broker/topic2
- rule2:订阅
broker/topic3
消息,将消息作为函数py-demo1/func2
的输入,将函数计算结果通过http POST请求发送至http://10.68.23.42:8554/rule/result
clients:
- name: http-server
kind: http
address: 'http://10.68.23.42:8554'
rules:
- name: rule1
source:
topic: broker/topic1
target:
topic: broker/topic2
function:
name: py-demo1/func1
- name: rule2
source:
topic: broker/topic3
target:
client: http-server
path: /rule/result
method: POST
function:
name: py-demo1/func2
logger:
level: debug
encoding: console
更新节点影子
节点影子相关说明可以参考该章节节点影子。 但是由于节点影子的更新仅提供了https接口,因此,可以通过baetyl-rule来路由mqtt消息,从而达成通过mqtt消息更新节点影子的需求。
在云端设置节点影子:
baetyl-rule参考配置如下:
clients:
- name: core
kind: http
address: 'https://baetyl-core.baetyl-edge-system'
rules:
- name: shadow
source:
topic: shadow/put
qos: 0
target:
client: core
path: /node/properties
method: PUT
logger:
level: debug
encoding: console
使用mqtt box连接到mqtt broker后,向定义的topic shadow/put
中发送需要修改的值。
回到云端,可以看到,该节点影子值已被修改。