边缘规则实例
规则引擎最先被应用在物联网云平台中。针对大量的物联网设备数据,规则引擎通过支持灵活的规则设置,对设备传上云端的设备时序数据、进行过滤、丰富、变换、动作、推送至不同的数据目的地(如时序数据库TSDB、Kafka、对象存储BOS等)以达到不同的业务目标。物联网规则引擎是基于事件驱动的数据流处理引擎,支持多种规则定义方式和描述方式。
在边缘计算场景下,要求边缘框架具有一定的自治能力,其中也包括对于端侧消息流具有一定的处理能力,可以实现消息的本地过滤、统计,再上传云端。边缘规则引擎可以实现端侧的消息路由能力,您可以设置边缘消息规则,控制本地消息在边缘计算节点中的流转,从而实现数据的安全可控。
1、名词解释
baetyl-broker: 单机版消息订阅和发布中心,采用 MQTT3.1.1 协议,是 Baetyl 框架端侧的消息中间件,为所有服务提供消息驱动的互联能力。
Iot Core: 云端 MQTT Broker,具体指一个具体的 Endpoint, 可以使用域名以及用户凭证进行连接,收发 MQTT 消息。
函数:Baetyl 框架的 Node、Python、Sql运行时,可以通过编写相应脚本进行消息的过滤、处理、转换以及丰富,可参考相关资料 baetyl-function 。
本文将以 Sql 运行时为例进行说明。
2、新建IoT Core云端实例
参考百度云IotCore文档 新建Iot Core实例并进行相关的设备模板配置。配置成功后,使用 MQTT Box 进行连接,其中 $iot/test/user/# 主题是我们自定义的具有收发权限的用户主题。
3、BIE侧配置
3.1、BIE云端纳管边缘节点
在云端新建边缘节点并安装到边缘设备,需要安装baetyl-rule
和baetyl-function
两个系统应用。
安装成功后如下所示:
在端侧节点运行如下命令,可看到:
3.2、配置 baetyl-broker
编辑 baetyl-broker 配置,暴露一个外部端口供测试使用。
然后编辑配置文件,配置文件如下:
listeners:
- address: tcp://0.0.0.0:8003
principals:
- username: test
password: hahaha
permissions:
- action: pub
permit: ["#"]
- action: sub
permit: ["#"]
session:
sysTopics:
- $link
- $baetyl
logger:
level: debug
encoding: console
新增 8003 端口供测试使用,并且需要设置映射宿主机端口以供连接。
然后在本地使用MqttBox连接baetyl-broker,测试连通性。
3.3、安装 process SQL处理函数
一段 process 处理函数如下:
select * where uid > 7
新建相关的配置项,注意添加运行时标签 SQL。
然后新建函数应用,并添加服务 process,引用上述配置项,并通过标签匹配到我们实现创建的节点上。
3.4、配置 baetyl-rule 边缘规则
数据的流转规则我们要配置在 baetyl-rule 模块中,所以这一步要修改 baetyl-rule 相关的配置。
配置文件如下:
clients:
- name: iotcore
kind: mqtt
address: 'tcp://arncpan.iot.gz.baidubce.com:1883'
username: arncpan/test
password: xxx
rules:
- name: rule1
source:
topic: broker/user/a
qos: 1
target:
client: iotcore
topic: $iot/test/user/a
qos: 1
function:
name: process
logger:
level: debug
encoding: console
密码根据实际场景进行替换。
4、测试边缘规则与边缘SQL函数
我们向 baetyl-broker 发消息,测试 iotcore 的主题是否能收到消息,并且测试 Sql 函数是否起到数据过滤作用。
baetyl-broker
iotcore
可以看到 iotcore 收到了消息。
接下来测试 uid < 7 的场景。
baetyl-broker
iotcore
可以看到 uid < 7 的消息没有被发送到 iotcore,而是被 Sql 函数过滤掉了。
5、测试边缘消息缓存与断线续传
边缘 baetyl-broker 具有离线缓存消息,端点续传的能力,对于 QoS1 的消息,baetyl-broker 会先持久化,再给订阅方发送消息,当订阅方回复 ACK 的时候,baetyl-broker 会删除这条持久化消息。接下来我们来测试这一能力。
首先我们断开本地网络,然后往边缘 baetyl-broker 发送 QoS1 的消息:
可以看到发送到边缘 baetyl-broker 成功。
在正常联网情况下,该条消息将经过 baetyl-rule 被转发到云端 Iot Core,但是此时处于断网环境,baetyl-rule 无法把消息转发到 Iot Core。观察 mqttbox 中 Iot Core 的连接,因为网络处于断开状态。
此时恢复网络,可以观察到 mqttbox 中 Iot Core 的连接恢复,订阅 $iot/test/user/a
主题可以收到之前边缘断网情况下发送的消息,证明本地 baetyl-broker 具有消息缓存、断点续传的能力。