数据目的地
数据目的地
数据目的地是规则接收到设备消息,并通过规则路由转发到的最终数据操作,目的地可以是存储、计算、消息队列等云产品,目前规则引擎支持如下几种数据目的地操作。
- 发布到另一个MQTT主题
- 写入百度消息服务for Kafka(原名百度消息服务BMS)
- 写入用户Kafka(跨账号百度消息服务for Kafka)
- 存储到时序数据库TSDB
- 转发给函数计算CFC
发布到另一个 MQTT 主题
您可以设置将订阅接收到的数据,转发到另一个Topic中,实现M2M或者更多其他场景。
规则引擎支持将消息转发给当前账号下任意IoT Core的Topic。目的地topic可以是任何合法的主题(不能包含通配符,如+、#),规则引擎有权限将消息转发到任何主题,但如果需要订阅该主题,请确保您使用的设备或应用有该主题的订阅权限,请务必事先在设备模板中配置设备的主题权限。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到规则指定的另一个主题中。 然后,拥有数据目的地主题订阅权限的设备或应用服务器可以订阅消息。
这种方式优势是可以方便、快速的将消息在设备与设备之间、设备与服务之间、服务与服务之间流转,以实现设备场景联动、远程下发指令、实时订阅设备状态数据等业务应用需求。
操作流程
例如规则引擎订阅接收到如下消息:
{ "temperature": 36.2 }
目的地是名称为core001的IoT Core下的destination/device001主题
设备端或应用端订阅core001下的destination/device001主题可接收到上述消息。
发布到另一个动态的 MQTT 主题
您可以设置将订阅接收到的数据,转发给动态MQTT主题目的地,实现复杂的M2M或者更多其他场景。
动态MQTT主题:动态MQTT主题是指通过设置的数据查询语法和函数表达式对消息内容计算,针对结果取值得到的MQTT主题
动态主题使用消息变换后内容为基础进行处理。
规则引擎支持将消息转发给当前账号下任意IoT Core的Topic。目的地topic可以是任何合法的主题(不能包含通配符,如+、#),规则引擎有权限将消息转发到任何主题,但如果需要订阅该主题,请确保您使用的设备或应用有该主题的订阅权限,请务必事先在设备模板中配置设备的主题权限。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到规则指定的另一个动态主题中。 然后,拥有数据目的地主题订阅权限的设备或应用服务器可以订阅消息。
这种方式具有灵活方便的优点,可以快速实现消息在设备与设备之间、设备与服务之间、服务与服务之间流转,以实现设备场景联动、远程下发指令、实时订阅设备状态数据等业务应用需求。
操作流程
例如规则引擎订阅接收到如下消息:
{
"temperature": 36.2,
"to": "device002"
}
Topic表达式 'destination/' & to
目的地是名称为core001的IoT Core下的destination/device002
主题
设备端或应用端订阅core001下的destination/device002
主题可接收到上述消息。
写入消息服务 for Kafka
您可以使用规则引擎将设备数据转发到百度消息服务for Kafka(原名百度消息服务BMS)主题中,服务端再从消息服务主题中消费消息,实现设备端与服务端之间高性能的消息闭环传输。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到消息服务的主题中。 然后,您的应用服务器调用消息服务的接口消费消息。
此方式使用消息队列保证消息的可靠性,避免了服务端不可用时导致消息丢失。同时,消息服务在处理大量消息并发时,有削峰填谷的作用,保证服务端不会因为突然的并发压力导致服务不可用。物联网平台与消息服务的结合,可以实现设备端与服务端之间高性能的消息闭环传输。
操作流程
例如规则引擎订阅接收到如下消息:
{ "temperature": 36.2 }
假设目的地是名称为广州区域下的kafka主题device2service
应用服务器通过消费device2service主题可实时处理设备数据,当服务器故障时,kafka中未消费的数据可以在服务恢复后继续处理。
写入用户Kafka
除了将消息转发到当前账号下消息服务for Kafka实例外,也可以使用规则引擎将设备数据转发到任意百度云账号下的Kafka主题中,服务端再从消息服务主题中消费消息,实现设备消息可以跨用户进行流转。
当设备实时数据需要共享给其他用户时,可由数据接收方提供自己账号下的Kafka主题,并在设备接入账号下的规则引擎中设置为数据目的地。
添加Kafka到数据目的地列表
使用其他账号下Kafka时,需要先将对应的Kafka添加到数据目的地列表中,之后在配置数据目的地时选中。
详细添加步骤见《数据目的地管理》章节。
数据转发流程
与写入消息服务 for Kafka一致
操作流程
与写入消息服务 for Kafka一致
存储到时序数据库TSDB
您可以配置规则,将数据转发到时序数据库TSDB的实例中存储。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到时序数据库TSDB。 然后,您的应用服务器调用TSDB的查询接口,获取设备的历史数据。
时序数据库是最适合存储设备属性、状态类时序数据的数据库,设备数据持久化存储到TSDB,业务应用从数据库中查询数据完成业务操作。
操作流程
写入时序数据的数据格式必须符合时序数据库写入接口的要求,即设备上报的原始消息经过查询语句的转换之后必须符合如下示例格式,才能正常写入,否则不能被成功写入TSDB。具体的转换语句可参考《常用查询语句示例》章节。
写入TSDB数据格式示例
{
"datapoints": [{
"metric": "cpu_idle",
"tags": {
"host": "server1",
"rack": "rack1"
},
"timestamp": 1465376157007,
"value": 51
}, {
"metric": "cpu_idle",
"tags": {
"host": "server2",
"rack": "rack2"
},
"values": [
[1465376269769, 67],
[1465376325057, 60]
]
}]
}
数据格式说明
参数名称 | 参数类型 | 是否必须 | 说明 |
---|---|---|---|
datapoints | List |
必须 | datapoint列表,由Datapoint对象组成的数组 |
Datapoint对象
参数名称 | 参数类型 | 是否必须 | 说明 |
---|---|---|---|
metric | String | 必须 | metric的名称 |
field | String | 可选 | field的名称,默认名称为value。不同的field支持不同的数据类型写入。对于同一个field,如果写入了某个数据类型的value之后,相同的field不允许写入其他数据类型 |
tags | Object | 必须 | data point对应的所有tag,Object中的一对key-value表示一个tag的key-value |
type | String | 可选 | 目前支持Long/Double/String/Bytes。代表value字段的类型,如果不填会根据解析出来的类型为准。bytes是种特殊类型,表示value是经过base64编码后的String,TSDB存储时会反编码成byte数组存储 |
timestamp | Int | 可选 | Unix时间戳,单位是毫秒;如果timestamp为空,value不为空,timestamp自动填入系统当前时间;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000;timestamp+value与values两者必须二选一 |
value | Int/Double/String | 可选 | data point的值,timestamp+value与values两者必须二选一。当写入的metric、field、tags、timestamp都相同时,后写入的value会覆盖先写入的value |
values | List<List |
可选 | 对于相同的metric+tags的data point,可以通过合并成一个values的List来减少payload,values是个二维数组,里面的一维必须是两个元素,第一个元素是timestamp,是unix时间戳,类型是Int,第二个元素是value,类型是Int/Double/String;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000 |
转发到函数计算CFC
您可以配置规则,将数据转发到函数计算CFC中对数据进行进一步处理,例如解析二进制数据、发短信或邮件给告警接收人、将数据转存储到关系型数据库等。
数据转发流程
设备发布消息到物联网平台中,物联网平台通过规则引擎将消息进行处理并转发到指定的函数中。 然后,您可以在函数中接收到经过规则引擎转换的数据,进行进一步处理。
操作流程
例如规则引擎订阅接收到如下消息:
{ "temperature": 36.2 }
规则过滤条件和查询语句设置为空(消息透传给CFC函数,实际使用中可以编写查询语句,最终规则会将输出结果转发给指定的函数)
目的地是名称为 print_log 的函数
print_log 函数中可接收到上述消息,规则引擎会将输出结果映射到函数的输入 event 参数,将event.message进行base64解析后即可获得规则引擎输出的结果消息。
event参数的说明如下
参数名称 | 参数类型 | 说明 |
---|---|---|
version | String | 目的地协议版本号,当前为v1 |
messageId | String | 平台生成的消息唯一ID |
source | Object | 原始数据来源信息 |
rulechainId | String | 转发消息的规则ID |
message | String | 规则处理得到结果经过base64处理后的字符串 |
timestamp | Int | 规则引擎向CFC转发数据的时间,取规则引擎服务时间戳,精确到毫秒 |
source对象说明
参数名称 | 参数类型 | 说明 |
---|---|---|
type | String | 原始数据来源类型,当前固定为IOT_CORE,表示来源于IoT Core |
iotCoreId | String | 消息来源的IoTCore ID |
topic | String | 消息来源topic |
clientId | String | 消息来源的clientId |
函数接收到的event对象示例
{
"version":"v1",
"messageId":"32456438592711743",
"source":{
"type":"IOT_CORE",
"iotCoreId":"abcdefg",
"topic":"abc/test",
"clientId":"123456"
},
"rulechainId":"0e357fac55b64bfa8c5c7e9fd56ee320",
"message":"MA==",
"timestamp":922669472162659
}
写入用户 HTTP 目的地
您可以使用规则引擎将设备消息以 POST 的方式,转发给自有 HTTP 服务中,服务接收消息后进行处理。转发至 HTTP 目的地消息体与上述转发至函数计算 CFC 消息体一致。
数据转发流程
设备发布消息到 IoT Core 中, IoT Core 通过规则引擎将消息进行处理并转发给指定的 HTTP 服务。
此方式通过常见的 HTTP 请求方式扩展消息接收方,以便业务进一步进行消息处理。
过高频次的消息可能导致您的 HTTP 服务无法正确响应,请根据业务消息频次合理选择目的地。
一般情况下,不建议基于 HTTP 进行高频消息传递(如超过100 QPS),若消息并发量较大,可采用上述中 Kafka 等方案。
操作流程
- 添加目的地并验证所有权
添加自定义的用户 HTTP 目的地后,需要进行所有权验证,确认所有权后方可进行数据传输。
点击「数据目的地」菜单,选择「添加目的地」,填写对应信息; 在目的地列表页面,点击验证,获取验证字符串; 将此验证字符串作为对应 URL 的 GET 返回,点击「验证」,即可完成验证。
若验证失败可访问对应 URL 检查字符串返回是否正确,重试。
验证字符串有效期72小时,请在此时间内完成;若超时请删除后重新创建。
- 数据传输 完成所有权验证后,可在已有或新建的规则详情中,选择并添加已验证的 HTTP 目的地,即可完成配置,符合过滤条件的消息将被 POST 至对应 URL。
POST 消息体与 CFC 目的地中数据格式一致。
参数名称 | 参数类型 | 说明 |
---|---|---|
type | String | 原始数据来源类型,当前固定为IOT_CORE,表示来源于IoT Core |
iotCoreId | String | 消息来源的IoTCore ID |
topic | String | 消息来源topic |
clientId | String | 消息来源的clientId |
{
"version":"v1",
"messageId":"32456438592711743",
"source":{
"type":"IOT_CORE",
"iotCoreId":"abcdefg",
"topic":"abc/test",
"clientId":"123456"
},
"rulechainId":"0e357fac55b64bfa8c5c7e9fd56ee320",
"message":"MA==",
"timestamp":922669472162659
}
注意:2022-08-25 前创建的规则,HTTP 目的地推送会将上述格式再次 base64 编码。