数据目的地
数据目的地
数据目的地是规则接收到设备消息,并通过规则路由转发到的最终数据操作,目的地可以是存储、计算、消息队列等云产品,目前规则引擎支持如下几种数据目的地操作。
- 发布到另一个 MQTT 主题
- 写入百度消息服务 for Kafka(支持共享版、专享版)
- 写入用户自定义 Kafka(自建 Kafka 或跨账号百度消息服务 for Kafka)
- 存储到百度时序数据库 TSDB
- 存储到百度对象存储 BOS
- 转发给百度函数计算 CFC
- 转发给用户自定义 HTTP 目的地
发布到另一个 MQTT 主题
您可以设置将订阅接收到的数据,转发到另一个 Topic 中,实现 M2M 或者更多其他场景。
规则引擎支持将消息转发给当前账号下任意 IoT Core 的 Topic。目的地 Topic 可以是任何合法的主题(不能包含通配符,如+、#),规则引擎有权限将消息转发到任何主题,但如果需要订阅该主题,请确保您使用的设备或应用有该主题的订阅权限,请务必事先在设备模板中配置设备的主题权限。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到规则指定的另一个主题中。 然后,拥有数据目的地主题订阅权限的设备或应用服务器可以订阅消息。
这种方式优势是可以方便、快速的将消息在设备与设备之间、设备与服务之间、服务与服务之间流转,以实现设备场景联动、远程下发指令、实时订阅设备状态数据等业务应用需求。

操作流程
例如规则引擎订阅接收到如下消息:
{ "temperature": 36.2 }
目的地是名称为 core001 的 IoT Core 下的 destination/device001 主题
设备端或应用端订阅 core001 下的 destination/device001 主题可接收到上述消息。
发布到另一个动态的 MQTT 主题
您可以设置将订阅接收到的数据,转发给动态 MQTT 主题目的地,实现复杂的 M2M 或者更多其他场景。
动态 MQTT 主题:动态 MQTT 主题是指通过设置的数据查询语法和函数表达式对消息内容计算,针对结果取值得到的 MQTT 主题
动态主题使用消息变换后内容为基础进行处理。
规则引擎支持将消息转发给当前账号下任意 IoT Core 的 Topic。目的地 Topic 可以是任何合法的主题(不能包含通配符,如+、#),规则引擎有权限将消息转发到任何主题,但如果需要订阅该主题,请确保您使用的设备或应用有该主题的订阅权限,请务必事先在设备模板中配置设备的主题权限。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到规则指定的另一个动态主题中。 然后,拥有数据目的地主题订阅权限的设备或应用服务器可以订阅消息。
这种方式具有灵活方便的优点,可以快速实现消息在设备与设备之间、设备与服务之间、服务与服务之间流转,以实现设备场景联动、远程下发指令、实时订阅设备状态数据等业务应用需求。
操作流程
例如规则引擎订阅接收到如下消息:
1{
2 "temperature": 36.2,
3 "to": "device002"
4}
Topic 表达式 'destination/' & to
目的地是名称为 core001 的 IoT Core 下的destination/device002主题
设备端或应用端订阅 core001 下的destination/device002主题可接收到上述消息。
写入百度消息服务 for Kafka
您可以使用规则引擎将设备数据转发到百度消息服务 for Kafka(原名百度消息服务 BMS)主题中,服务端再从 Kafka 主题中消费消息,实现设备端与服务端之间高性能的消息闭环传输。
百度消息服务 for Kafka 支持共享版和专享版。配置规则目的地时,共享版 Kafka 主题可直接选择;专享版 Kafka 需要先在“数据目的地管理”中添加专享版 Kafka 目的地后再引用。
共享版 Kafka
将设备消息转发到当前账号下的百度消息服务 for Kafka 共享集群的指定主题。添加规则目的地时选择区域和主题后即可作为规则目的地使用,无需额外配置。
专享版 Kafka
专享版 Kafka 适用于将设备消息转发到当前账号下的专享版百度消息服务 for Kafka 集群。在规则中使用前,需要先在“数据目的地”中添加专享版 Kafka 目的地,配置区域、专享版集群、Topic、连接协议和认证信息。详细添加步骤见《数据目的地管理》章节。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到消息服务的主题中。 然后,您的应用服务器调用消息服务的接口消费消息。
此方式使用消息队列保证消息的可靠性,避免了服务端不可用时导致消息丢失。同时,消息服务在处理大量消息并发时,有削峰填谷的作用,保证服务端不会因为突然的并发压力导致服务不可用。物联网平台与消息服务的结合,可以实现设备端与服务端之间高性能的消息闭环传输。

操作流程
例如规则引擎订阅接收到如下消息:
{ "temperature": 36.2 }
假设目的地是名称为广州区域下的 kafka 主题 device2service
应用服务器通过消费 device2service 主题可实时处理设备数据,当服务器故障时,kafka 中未消费的数据可以在服务恢复后继续处理。
写入用户自定义 Kafka
除了将消息转发到当前账号下的百度消息服务 for Kafka 实例外,也可以使用规则引擎将设备数据转发到用户自建的、规则引擎网络可达且支持 JKS 证书认证的 Kafka 集群,或其他百度云账号下的百度消息服务for Kafka 主题中,实现设备消息跨用户流转。
当设备实时数据需要共享给其他用户时,可由数据接收方提供自己账号下的 Kafka 主题,并在设备接入账号下的规则引擎中设置为数据目的地。
使用用户自定义 Kafka 时,需要先将对应 Kafka 添加到数据目的地列表中,之后在配置数据目的地时选中。详细添加步骤见《数据目的地管理》章节。
数据转发流程
与写入消息服务 for Kafka 一致
操作流程
与写入消息服务 for Kafka 一致
存储到时序数据库 TSDB
您可以配置规则,将数据转发到时序数据库 TSDB 的实例中存储。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到时序数据库 TSDB。 然后,您的应用服务器调用 TSDB 的查询接口,获取设备的历史数据。
时序数据库是最适合存储设备属性、状态类时序数据的数据库,设备数据持久化存储到 TSDB,业务应用从数据库中查询数据完成业务操作。

操作流程
写入时序数据的数据格式必须符合时序数据库写入接口的要求,即设备上报的原始消息经过查询语句的转换之后必须符合如下示例格式,才能正常写入,否则不能被成功写入 TSDB。具体的转换语句可参考《常用查询语句示例》章节。
写入 TSDB 数据格式示例
1{
2 "datapoints": [{
3 "metric": "cpu_idle",
4 "tags": {
5 "host": "server1",
6 "rack": "rack1"
7 },
8 "timestamp": 1465376157007,
9 "value": 51
10 }, {
11 "metric": "cpu_idle",
12 "tags": {
13 "host": "server2",
14 "rack": "rack2"
15 },
16 "values": [
17 [1465376269769, 67],
18 [1465376325057, 60]
19 ]
20 }]
21}
数据格式说明
| 参数名称 | 参数类型 | 是否必须 | 说明 |
|---|---|---|---|
| datapoints | List |
必须 | datapoint 列表,由 Datapoint 对象组成的数组 |
Datapoint对象
| 参数名称 | 参数类型 | 是否必须 | 说明 |
|---|---|---|---|
| metric | String | 必须 | metric 的名称 |
| field | String | 可选 | field 的名称,默认名称为 value。不同的 field 支持不同的数据类型写入。对于同一个 field,如果写入了某个数据类型的 value 之后,相同的 field 不允许写入其他数据类型 |
| tags | Object | 必须 | data point 对应的所有 tag,Object 中的一对 key-value 表示一个 tag 的 key-value |
| type | String | 可选 | 目前支持 Long/Double/String/Bytes。代表 value 字段的类型,如果不填会根据解析出来的类型为准。bytes 是种特殊类型,表示 value 是经过 base64 编码后的 String,TSDB 存储时会反编码成 byte 数组存储 |
| timestamp | Int | 可选 | Unix 时间戳,单位是毫秒;如果 timestamp 为空,value 不为空,timestamp 自动填入系统当前时间;如果 timestamp 的位数小于等于 10 位,将认为精度是秒,自动乘以1000;timestamp+value 与 values 两者必须二选一 |
| value | Int/Double/String | 可选 | data point 的值,timestamp+value 与 values 两者必须二选一。当写入的 metric、field、tags、timestamp 都相同时,后写入的 value 会覆盖先写入的 value |
| values | List<List |
可选 | 对于相同的 metric+tags 的 data point,可以通过合并成一个 values 的 List 来减少 payload,values 是个二维数组,里面的一维必须是两个元素,第一个元素是timestamp,是 unix 时间戳,类型是 Int,第二个元素是 value,类型是 Int/Double/String;如果 timestamp 的位数小于等于10位,将认为精度是秒,自动乘以1000 |
存储到百度对象存储 BOS
您可以配置规则,将数据转发到百度对象存储 BOS 目的地,以实现设备消息存储,适用于消息体为文件或者日志等形态。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到对象存储 BOS 的指定 bucket 根目录中。消息存储方式支持按时间聚合,也支持单个消息存储为一个文件。 然后,您的应用服务器可以调用 BOS 的查询接口,获取设备的历史数据。

操作流程
例如规则引擎订阅接收到如下消息:
{ "temperature": 36.2 }
规则过滤条件和查询语句设置为空(消息透传给对象存储 BOS 服务,实际使用中可以编写查询语句,最终规则会将输出结果转发给指定的函数)。那么最终该消息会被存储在指定的 bucket 的{iotCoreId}/{rulechainId}/{time}/{messageId}目录下。
如果选择的存储方式是按照时间聚合,那么多个消息会以指定的聚合周期存储为一个文件,多个消息之间被指定的分隔符分开。
目前支持以每分钟/每小时进行聚合,消息分隔符支持 Unix 换行符、制表符、竖线,或者其他自定义的10字符以内的字符串。
为避免对 BOS 目的地造成吞吐压力,建议存储方式为按时间聚合,需注意单个 BOS 的大小存在上限,请选择合适的聚合时间。
但如果您的使用场景确实需要将单个消息存储为一个文件,请发起工单,申请开通此类存储方式的功能白名单。
转发到函数计算 CFC
您可以配置规则,将数据转发到函数计算 CFC中对数据进行进一步处理,例如解析二进制数据、发短信或邮件给告警接收人、将数据转存储到关系型数据库等。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到指定的函数中。 然后,您可以在函数中接收到经过规则引擎转换的数据,进行进一步处理。

操作流程
例如规则引擎订阅接收到如下消息:
{ "temperature": 36.2 }
规则过滤条件和查询语句设置为空(消息透传给CFC函数,实际使用中可以编写查询语句,最终规则会将输出结果转发给指定的函数)
目的地是名称为 print_log 的函数
print_log 函数中可接收到上述消息,规则引擎会将输出结果映射到函数的输入 event 参数,将event.message 进行 base64 解析后即可获得规则引擎输出的结果消息。
event 参数的说明如下
| 参数名称 | 参数类型 | 说明 |
|---|---|---|
| version | String | 目的地协议版本号,当前为v1 |
| messageId | String | 平台生成的消息唯一 ID |
| source | Object | 原始数据来源信息 |
| rulechainId | String | 转发消息的规则 ID |
| message | String | 规则处理得到结果经过 base64 处理后的字符串 |
| timestamp | Int | 规则引擎向 CFC 转发数据的时间,取规则引擎服务时间戳,精确到毫秒 |
source对象说明
| 参数名称 | 参数类型 | 说明 |
|---|---|---|
| type | String | 原始数据来源类型,当前固定为 IOT_CORE,表示来源于 IoT Core |
| iotCoreId | String | 消息来源的 IoTCore ID |
| topic | String | 消息来源 topic |
| clientId | String | 消息来源的 clientId |
函数接收到的event对象示例
1{
2 "version":"v1",
3 "messageId":"32456438592711743",
4 "source":{
5 "type":"IOT_CORE",
6 "iotCoreId":"abcdefg",
7 "topic":"abc/test",
8 "clientId":"123456"
9 },
10 "rulechainId":"0e357fac55b64bfa8c5c7e9fd56ee320",
11 "message":"MA==",
12 "timestamp":922669472162659
13}
写入用户自定义 HTTP 目的地
您可以使用规则引擎将设备消息以 POST 的方式,转发给用户自定义 HTTP 服务中,服务接收消息后进行处理。转发至 HTTP 目的地消息体与上述转发至函数计算 CFC 消息体一致。
数据转发流程
设备发布消息到 IoT Core 中, IoT Core 通过规则引擎将消息进行处理并转发给指定的 HTTP 服务。

此方式通过常见的 HTTP 请求方式扩展消息接收方,以便业务进一步进行消息处理。
过高频次的消息可能导致您的 HTTP 服务无法正确响应,请根据业务消息频次合理选择目的地。
一般情况下,不建议基于 HTTP 进行高频消息传递(如超过100 QPS),若消息并发量较大,可采用上述中 Kafka 等方案。
操作流程
- 添加目的地并验证所有权
添加自定义的用户 HTTP 目的地后,需要进行所有权验证,确认所有权后方可进行数据传输。
点击「数据目的地」菜单,选择「添加目的地」,填写对应信息; 在目的地列表页面,点击验证,获取验证字符串; 将此验证字符串作为对应 URL 的 GET 返回,点击「验证」,即可完成验证。
若验证失败可访问对应 URL 检查字符串返回是否正确,重试。
验证字符串有效期为 72 小时,请在此时间内完成;若超时请删除后重新创建。
- 数据传输 完成所有权验证后,可在已有或新建的规则详情中,选择并添加已验证的 HTTP 目的地,即可完成配置,符合过滤条件的消息将被 POST 至对应 URL。
POST 消息体与 CFC 目的地中数据格式一致。
| 参数名称 | 参数类型 | 说明 |
|---|---|---|
| type | String | 原始数据来源类型,当前固定为 IOT_CORE,表示来源于 IoT Core |
| iotCoreId | String | 消息来源的 IoTCore ID |
| topic | String | 消息来源 topic |
| clientId | String | 消息来源的 clientId |
1{
2 "version":"v1",
3 "messageId":"32456438592711743",
4 "source":{
5 "type":"IOT_CORE",
6 "iotCoreId":"abcdefg",
7 "topic":"abc/test",
8 "clientId":"123456"
9 },
10 "rulechainId":"0e357fac55b64bfa8c5c7e9fd56ee320",
11 "message":"MA==",
12 "timestamp":922669472162659
13}
注意:2022-08-25 前创建的规则,HTTP 目的地推送会将上述格式再次 base64 编码。
评价此篇文章
