数据转发到用户 Kafka
数据转发到用户 Kafka
一、简介
本文档介绍如何通过 IoT Core 规则引擎,将设备消息转发至用户 Kafka 目的地,实现跨账号消息转发、写入自建 Kafka 集群或写入当前账号下专享版百度消息服务 for Kafka 集群等场景。
规则引擎中的“用户 Kafka”包括以下两类目的地:
- 专享版 Kafka:当前账号下的专享版百度消息服务 for Kafka 集群。
- 自定义 Kafka:用户自建 Kafka,或其他百度云账号下的百度消息服务 for Kafka。
使用用户 Kafka 前,需要先在 IoT Core 的“数据目的地管理”中添加对应 Kafka 目的地,添加成功并通过连接测试后,才能在规则中选择并使用。
其他关于规则引擎转发到当前账号百度消息服务 for Kafka 共享版主题的功能,请参考数据转发到百度消息服务 for Kafka。
阅读本示例前,建议先熟悉 IoT Core 快速入门文档。
二、场景描述
用户 Kafka 适用于以下场景:
- 写入专享版 Kafka:设备消息需要写入当前账号下的专享版百度消息服务 for Kafka 集群,用于满足专享集群、网络隔离或更高规格的消息处理需求。
- 写入自定义 Kafka:设备消息需要写入用户自建的、规则引擎网络可达且支持 JKS 证书认证的 Kafka 集群。
- 跨账号写入百度消息服务 for Kafka:设备实时数据需要共享给其他百度云账号,由数据接收方提供其账号下的 Kafka 主题和证书信息,数据发送方在 IoT Core 规则引擎中配置为用户 Kafka 目的地。
配置完成后,数据流向如下:
1设备 -> IoT Core -> 规则引擎 -> 用户 Kafka
三、注意事项
- 专享版 Kafka 使用前,需要先在百度消息服务 for Kafka 控制台为对应集群开启产品间转储,并确认所选连接协议已开放可用接入地址。未开启产品间转储时,系统无法获取可用内网接入地址,连接测试和规则写入都会失败。
- 专享版 Kafka 的连接地址由系统根据所选集群和连接协议自动获取,用户无需手动填写。
- 自定义 Kafka 当前支持 JKS 证书认证。使用百度消息服务 for Kafka 作为自定义 Kafka 时,可从对应主题下载证书包,并从证书包中获取可信证书、私钥及对应密码。
- 用户 Kafka 目的地添加成功后,建议先执行连接测试。连接测试失败时,规则向该目的地写入数据同样可能失败。
- 规则引擎写入 Kafka 时暂不支持指定分区。
四、操作步骤
4.1 创建 IoT Core 实例
进入物联网核心套件 IoT Core 控制台后,进行如下操作:
- 在“实例列表”中点击“创建 IoT Core”。
- 填写名称、描述后,点击“提交”。

当创建的 IoT Core 实例处于“运行中”状态时,表示实例创建成功。

4.2 添加模板
- 点击已创建的 IoT Core 实例名称,进入实例详情页面。
- 在左侧导航栏选择“模板”,点击“添加模板”。

- 填写模板名称,点击“确认”完成添加。

默认创建的模板包含预设主题,其中 {deviceName} 是占位符,实际设备使用时需要替换为设备名称。也可以进入模板详情,按需添加自定义主题。

4.3 新增设备
- 点击已创建的 IoT Core 实例名称,进入实例详情页面。
- 在左侧导航栏选择“设备列表”,点击“新增设备”。

- 填写名称、认证方式、描述和模板后,点击“提交”。
- 认证方式选择“证书认证”时,可参考使用证书鉴权建立 MQTT 连接。
- 认证方式选择“密钥认证”时,可参考通过 MQTT 连接及收发消息。
- 设备名称用于替换主题中的
{deviceName}占位符。

4.4 添加用户 Kafka 目的地
在配置规则前,需要先进入“数据目的地管理”添加用户 Kafka 目的地。
- 进入 IoT Core 实例详情页面。
- 在左侧导航栏选择“规则引擎”下的“数据目的地”。
- 点击“添加目的地”。
- 选择数据目的地类型为“用户 Kafka”。
- 根据实际使用场景选择“专享版 Kafka”或“自定义 Kafka”。
4.4.1 添加专享版 Kafka
专享版 Kafka 适用于将设备消息写入当前账号下的专享版百度消息服务 for Kafka 集群。
添加前请确认:
- 已在百度消息服务 for Kafka 控制台创建专享版 Kafka 集群和 Topic。
- 已为对应集群开启产品间转储。
- 已确认需要使用的连接协议已开放可用接入地址。
填写配置信息:
| 配置项 | 说明 |
|---|---|
| 区域 | 专享版百度消息服务 for Kafka 集群所在区域。 |
| 集群 | 需要写入的专享版 Kafka 集群。建议选择运行状态正常的集群。 |
| 主题 | 需要写入的 Kafka 主题。 |
| 连接协议 | 支持 SSL 和 PLAINTEXT。选择的协议需要与集群开放的接入协议一致。 |
| 可信证书 | 选择 SSL 协议时必填,仅支持 .jks 文件。选择 PLAINTEXT 协议时无需填写。 |
| 可信证书密码 | 选择 SSL 协议时必填。选择 PLAINTEXT 协议时无需填写。 |
| 私钥 | 选择 SSL 协议时必填,仅支持 .jks 文件。选择 PLAINTEXT 协议时无需填写。 |
| 私钥密码 | 选择 SSL 协议时必填。选择 PLAINTEXT 协议时无需填写。 |

点击“提交”完成添加。添加成功后,可以在数据目的地列表中执行“连接测试”。

说明:
- 专享版 Kafka 的连接地址由系统根据所选集群和连接协议自动获取。
- 如果连接测试失败,请检查集群状态、产品间转储是否开启,以及所选连接协议是否与集群开放的接入协议一致。
4.4.2 添加自定义 Kafka
自定义 Kafka 适用于将设备消息写入用户自建的、规则引擎网络可达且支持 JKS 证书认证的 Kafka 集群,或其他百度云账号下的百度消息服务 for Kafka 主题。
使用其他百度云账号下的百度消息服务 for Kafka 时,需要数据接收方提供对应 Kafka 主题和证书包。进入百度消息服务 for Kafka 控制台后,可在证书相关页面下载密钥文件,并解压获得后续配置需要的证书和密码。

填写配置信息:
| 配置项 | 说明 |
|---|---|
| 认证方式 | 仅支持 JKS 证书认证。 |
| 连接地址 | Kafka 的连接地址和端口号,格式为 host:port。多个 broker 地址使用英文逗号分隔。 |
| 可信证书 | 可信证书,仅支持 .jks 文件。使用百度消息服务 for Kafka 时,可从证书包中获取 client.truststore.jks。 |
| 可信证书密码 | 可信证书密码。使用百度消息服务 for Kafka 时,可从证书包中的 client.properties 获取 ssl.truststore.password。 |
| 私钥 | 私钥,仅支持 .jks 文件。使用百度消息服务 for Kafka 时,可从证书包中获取 client.keystore.jks。 |
| 私钥密码 | 私钥密码。使用百度消息服务 for Kafka 时,可从证书包中的 client.properties 获取 ssl.keystore.password。 |
| 主题 | 需要写入的 Kafka 主题。 |
| 描述 | 可选,对目的地的描述信息。 |
4.5 创建并配置规则
- 点击“规则引擎”下的“规则列表”。若还未创建规则,请先点击“创建规则”进行创建。

- 创建完成后,点击“编辑调试”进入规则配置页面。
- 配置数据来源,填写设备消息来源 Topic。例如:
1$iot/+/events
- 添加数据目的地,选择“用户 Kafka”,并选择 4.4 中已添加的 Kafka 目的地。
- 配置查询语句。
如果不需要对消息做格式转换,可填写:
1$
表示规则引擎不进行格式转换,直接将原始 JSON 消息转发到用户 Kafka。


- 点击“保存”,完成规则配置。
- 返回规则列表,点击“启用”运行规则。

4.6 测试
完成配置后,可以测试数据流向是否符合预期:
1设备 -> IoT Core -> 规则引擎 -> 用户 Kafka
可以通过 MQTT.fx 等 MQTT 模拟器向 IoT Core 发送测试消息。MQTT.fx 的使用方式可参考使用 MQTT 模拟器连接及收发消息。
发送测试消息后,进入对应 Kafka 主题查看监控指标或消费消息内容。如果 Kafka 主题写入指标增长,说明设备消息已通过规则引擎成功转发到用户 Kafka。

评价此篇文章
