集成Kuiper流式计算引擎
本文以一个常见的物联网使用场景为案例,介绍了如何利用边缘计算框架 Baetyl 来实现对业务的快速、低成本和有效地处理。
在各类物联网项目中,比如智能楼宇项目,需要采集和分析楼宇数据,如电梯、燃气、水电等。一种解决方案是将所有的设备直接接入在云端的物联网平台,类似于像 Baidu IoT Core 或者 AWS IoT Core。这种解决方案的问题在于:
- 数据处理时延较长:通过 Internet 传输和云端的处理后返回给设备,所需时间较长
- 数据传输和存储成本:通过 Internet 传输需要带宽,对于大规模连接的物联网项目来说,耗费的带宽会相当可观
- 数据的安全性:有些物联网的数据会相当敏感,全部通过物联网传输的话会有风险
为了解决以上的问题,业界提出了边缘计算的方案,边缘计算的核心就在于把数据进行就近处理,避免不必要的时延、成本和安全问题。开源框架 Baetyl 是百度贡献给 Linux 基金会的开源边缘计算框架,主推物联网场景下端侧的边缘计算解决方案。
本文将流处理模块 kuiper 部署到边缘计算框架 baetyl 上,对一段时间内边缘侧的设备消息进行流式处理,并将处理结果上传云端进行存储。
业务场景
假设现有一组设备,组中的每个设备有一个 ID,通过 MQTT 协议往 MQTT 消息服务器上相应的主题发送数据。主题的设计如下,其中 {device_id} 为设备的 ID。
devices/{device_id}/messages
每个设备发送的数据格式为 JSON,发送的通过该传感器采集的温度与湿度数据。
{
"temperature": 30,
"humidity" : 20
}
现在需要实时分析数据,并提出以下需求:对每个设备的温度数据按照每 10 秒钟计算平均值(t_av),并且记下 10 秒钟内的最大值 (t_max)、最小值(t_min) 和数据条数(t_count),计算完毕后将这 4 个结果进行保存,以下为样例结果数据:
[
{
"device_id" : "1", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2
},
{
"device_id" : "2", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2
},
...
]
方案介绍
如下图所示,我们将在 baetyl 边缘计算框架上,采用边缘分析/流式数据处理的方式,从 baetyl-broker 订阅相关设备消息,最后将处理结果输出到 Baidu 的 IoT Core 中。
baetyl-broker 是 Baetyl 框架端侧的消息中间件,采用 MQTT3.1.1 协议,可在低带宽、不可靠网络中提供可靠的消息传输服务。
kuiper 是基于 SQL 的轻量级边缘流式数据分析引擎,安装包只有约 7MB,非常适合于运行在边缘设备端。
Baidu Iot Core 提供了比较全的设备接入和数据分析的方案,此处用于云端的结果数据接入,以及应用所需的结果数据分析。
安装 baetyl 计算框架
在云端新建边缘节点并安装到边缘设备。安装成功后如下所示:
新建 Iot Core 实例
参考百度云IoTCore文档 新建Iot Core实例并进行相关的设备模板配置。配置成功后,使用 MqttBox 进行连接,其中 $iot/test/user/# 主题是我们自定义的具有收发权限的用户主题。
配置 baetyl-broker
编辑 baetyl-broker 配置,暴露一个外部端口供测试使用。
然后编辑配置文件,配置文件如下:
listeners:
- address: tcp://0.0.0.0:8003
principals:
- username: test
password: hahaha
permissions:
- action: pub
permit: ["#"]
- action: sub
permit: ["#"]
session:
sysTopics:
- $link
- $baetyl
logger:
level: debug
encoding: console
新增 8003 端口供测试使用。 并且需要设置映射宿主机端口以供连接。
然后在本地使用MqttBox连接baetyl-broker,来测试连通性。
安装 kuiper
从 Kuiper 官方镜像仓库镜像仓库选取 Kuiper 的官方 Docker 镜像,这里选取的是:
emqx/kuiper:0.5.1-alpine
然后创建容器服务,并添加 kuiper 服务,设置镜像、添加端口映射以及环境变量。
MQTT_BROKER_ADDRESS=baetyl-broker.baetyl-edge-system:8003
MQTT_BROKER_USERNAME=test
MQTT_BROKER_PASSWORD=hahaha
用户在端侧可以通过 Telnet 命令来判断边缘设备上 Kuiper 是否启动成功。
更多 kuiper 资料可以参考 kuiper 官网 。
安装集成 kuiper 插件
kuiper 原生的 stream、rule 创建都是通过 Http 请求,为了适配 baetyl 平台,可以使用 kuiper 推出的适配插件: kuiper-kubernetes-tool ,支持从配置文件加载 stream、rule 配置。
从 kuiper 官方镜像仓库镜像仓库选取 kuiper 的官方 Docker 镜像,这里选取的是:
emqx/kuiper-kubernetes-tool:0.5.1
我们在新建 kuiper 插件应用时,先新建对应的配置文件。
创建流语法解析
创建流的目的是为了定义发送到该流上的数据格式,类似于在关系数据库中定义表的结构。 kuiper 中所有支持的数据类型,可以参考 kuiper 官网 。
{
"commands":[
{
"url":"/streams",
"description":"create stream1",
"method":"post",
"data":{
"sql":"create stream demo (temperature float, humidity bigint) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\");"
}
}
]
}
上述语句在 kuiper 中创建了一个名为 demo 的流定义,包含了两个字段,分别为 temperature 和 humidity,数据源为订阅 MQTT 的主题 devices/+/messages,这里请注意采用了通配符 +,用于订阅不同设备的消息。
数据业务逻辑处理语法解析
kuiper 采用 SQL 实现业务逻辑,每10秒钟统计温度的平均值、最大值、最小值和次数,并根据设备 ID 进行分组,实现的 SQL 如下所示。
{
"commands":[
{
"url":"/rules",
"description":"create rule1",
"method":"post",
"data":{
"id":"rule1",
"sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
"actions": [
{
"log": {}
},
{
"mqtt": {
"server": "tcp://arncpan.iot.gz.baidubce.com:1883",
"topic": "$iot/test/user/b",
"protocol_version": "3.1.1",
"qos": 0,
"clientId": "demo_001",
"username": "arncpan/test",
"password": "xxxx"
}
}
]
}
}]
}
这里的 SQL 用了四个聚合函数,用于统计在10秒钟窗口期内的相关值。
- avg: 平均值
- max: 最大值
- min: 最小值
- count: 计数
另外还使用了几个基本的函数:
- mqtt: 消息中取出 MQTT 协议的信息,mqtt(topic) 就是取得当前取得消息的主题名称
- split_value: 该函数将第一个参数使用第二个参数进行分割,然后第三个参数指定下标,取得分割后的值。所以函数 split_value("devices/001/messages", "/", 1)调用就返回001
- GROUP BY 跟的是分组的字段,分别为计算字段 device_id;时间窗口 TUMBLINGWINDOW(ss, 10),该时间窗口的含义为每10秒钟生成一批统计数据。
actions 列表中的 mqtt 类型的 action 的相关配置信息是 Iot Core 的连接信息。这里注意替换 IotCore 的连接信息。
创建命令配置项
将上述两步的语法填写到配置项中。 创建配置项如下:
创建配置信息配置项
配置信息用于 kuiper-kubernetes-tool 连接 kuiper 模块,其中指定了 kuiper 的 ip、port 等信息。
port: 9081
timeout: 500
intervalTime: 30
ip: "kuiper"
logPath: "log/kuiper.log"
commandDir: "sample"
关于配置详情可以参考 kuiper-kubernetes-tool 文档 。其中 9081 端口是 kuiper 默认的 Restful API 端口。
创建配置项如下:
创建 kuiper-tool 应用
新建容器服务,并添加 kuiper-kubernetes-tool 服务,设置镜像、添加上两步的配置项。
如果上述步骤都安装正确,在边缘设备执行如下命令,可以得到如下结果:
测试
我们使用 Mqtt Box 模拟设备向事先约定的 Topic 主题发送消息,观察 Iot Core 是否可以收到流式处理的结果。
我们分别向 Baetyl-Broker 发送两条消息:
{"temperature": 30, "humidity" : 80}
{"temperature": 60, "humidity" : 80}
预期 10s 后 Iot Core 会收到如下消息:
[{"device_id":"device_001","t_av":45,"t_count":2,"t_max":60,"t_min":30}]
实际操作:
向 baetyl-broker 发送两条消息。
IotCore 查看:
如上图,符合预期。
此时观察端上应用的资源消耗:
可以看出流式处理引擎 Kuiper 只消耗了极小的内存和CPU。
通过本文,读者可以基于 Baetyl 边缘计算框架快速集成 Kuiper 流式处理引擎,快速搭建边缘侧的流式解决方案,灵活地开发出基于边缘数据分析的系统,实现数据的低时延、低成本和安全的处理。