百度消息服务触发器
更新时间:2024-07-05
百度消息服务触发器说明
百度消息服务是全兼容 Apache Kafka 的分布式、高可扩展、高通量的消息托管服务。
如果在百度消息服务的 Topic 上检测到记录,您可以使用 CFC 函数从此 Topic 读取成批的数据来处理,并且 CFC 会定期轮询(每秒一次)Topic 中的新记录。
百度消息服务目前分为共享版和专享版两类,两者的区别您可参考百度消息服务相关文档 消息服务 for Kafka,目前推荐使用百度消息服务专享版创建触发器。
如果您使用百度消息服务专享版创建触发器,需要在创建消息服务集群时开启网络配置中的产品间转存储功能,触发器才能正常工作,共享版无相关限制
百度消息服务触发器创建
用户可以为新建的函数或已有函数配置百度消息服务触发器,创建函数的流程可以具体参考 从头创建函数,这里不再赘述。
这里假设您已经使用Python2.7创建了一个名为 kafkatrigger 的函数,以下内容以此为前提,将引导您在 CFC 控制台在函数管理页面中为函数配置百度消息服务触发器。接下来,我们将通过以下步骤来完成一个触发器的设置。
编写处理函数
- 登录管理控制台,选择“产品服务>云函数计算 CFC”,进入“函数列表”页面
- 在“函数列表”页面。点击名称为 kafkatrigger 的函数,进入函数详情页面。
在函数详情页中编写百度消息服务触发器对应的 handler,以对触发操作返回适当的响应,之后点击右下角保存按钮完成函数的修改操作。
# -*- coding: utf-8 -*-
import base64
import json
def handler(event, context):
for record in event['Records']:
# kafka value is base64 encoded so decode here
payload = base64.b64decode(record['Kafka']['Value'])
print("Decoded payload: " + payload)
return 'Successfully processed {} records.'.format(len(event['Records']))
注:百度消息服务触发器的event消息体
{
"Records": [
{
"EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
"Kafka": {
"Key": "", //base64后的key
"Value": "MA==", //base64后的value
"Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
"Partition": 0,
"Offset": 43110,
"Timestamp": 1553151704.6529999
},
"EventName": "bce:kafka:record",
"EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
"EventSource": "bce:kafka"
},
{
"EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
"Kafka": {
"Key": "",
"Value": "MQ==",
"Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
"Partition": 0,
"Offset": 43111,
"Timestamp": 1553151704.6529999
},
"EventName": "bce:kafka:record",
"EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
"EventSource": "bce:kafka"
}
]
}
配置百度消息服务触发器(共享版)
- 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面
- 点击需要添加百度消息服务触发器的函数名称(即 kafkatrigger),进入函数详情页面。
- 点击左侧导航栏中的“触发器”,进入函数配置页面。
- 在函数配置页面中最下方点击“新增触发器”。
- 在弹出框中,点击下拉框“选择事件源进行添加”,选择百度消息服务触发器。
-
之后在弹出框中配置好选项,并点击确认,完成触发器的创建。
- Topic 选择您将要监听的百度消息服务 Topic
- 批处理大小:从 Topic 中一次读取的最大记录数,1-1000
- 起始位置:在 Topic 中开始读取的位置,最新记录对应 kafka 的 OffsetNewest,最老记录对应 kafka 的 OffsetOldest
- 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试
- 在函数配置页面“触发器”一栏中,可以看到刚刚创建好的百度消息服务触发器及其信息
配置百度消息服务触发器(专享版)
- 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面
- 点击需要添加百度消息服务触发器的函数名称(即 kafkatrigger),进入函数详情页面。
- 点击左侧导航栏中的“触发器”,进入函数配置页面。
- 在函数配置页面中最下方点击“新增触发器”。
- 在弹出框中,点击下拉框“选择事件源进行添加”,选择百度消息服务触发器(专享版)。
-
之后在弹出框中配置好选项,并点击确认,完成触发器的创建。
- Kafka集群: 选择您要使用的百度消息服务专享版集群名
- Topic: 选择您将要监听的百度消息服务 Topic
- 批处理大小:从 Topic 中一次读取的最大记录数,1-1000
- 起始位置:在 Topic 中开始读取的位置,最新记录对应 kafka 的 OffsetNewest,最老记录对应 kafka 的 OffsetOldest
- 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试
- 在函数配置页面“触发器”一栏中,可以看到刚刚创建好的百度消息服务触发器(专享版)及其信息
测试触发器
专享版与共享版触发器的使用在CFC侧相同,测试触发器流程一致
模拟测试
- 点击测试按钮
-
输入测试事件,并点击执行
测试内容如下
{ "Records": [ { "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110", "Kafka": { "Key": "", "Value": "MA==", "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "Partition": 0, "Offset": 43110, "Timestamp": 1553151704.6529999 }, "EventName": "bce:kafka:record", "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "EventSource": "bce:kafka" }, { "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111", "Kafka": { "Key": "", "Value": "MQ==", "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "Partition": 0, "Offset": 43111, "Timestamp": 1553151704.6529999 }, "EventName": "bce:kafka:record", "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "EventSource": "bce:kafka" } ] }
3. 查看返回的执行结果
真实测试
在添加好触发器之后,需要设置日志存储,之后向 kafka topic 中发送消息,在日志中可以看到执行结果。
并且在触发器的界面您也可以看到最后的执行结果