搜索本产品文档关键词
百度消息服务触发器使用示例
所有文档
menu
没有找到结果,请重新输入

函数计算 CFC

百度消息服务触发器使用示例

百度消息服务触发器说明

百度消息服务是全兼容 Apache Kafka 的分布式、高可扩展、高通量的消息托管服务。

如果在百度消息服务的 Topic 上检测到记录,您可以使用 CFC 函数从此 Topic 读取成批的数据来处理,并且 CFC 会定期轮询(每秒一次)Topic 中的新纪录。

百度消息服务触发器创建

用户可以为新建的函数或已有函数配置百度消息服务触发器,创建函数的流程可以具体参考 从头创建函数,这里不再赘述。

这里假设您已经创建了一个名为 kafkatrigger 的函数,以下内容以此为前提,将引导您在 CFC 控制台在函数管理页面中为函数配置百度消息服务触发器。接下来,我们将通过以下步骤来完成一个触发器的设置。

编写处理函数

  1. 登录管理控制台,选择“产品服务>云函数计算 CFC”,进入“函数列表”页面
  2. 在“函数列表”页面。点击名称为 kafkatrigger 的函数,进入函数详情页面。

在函数详情页中编写百度消息服务触发器对应的 handler,以对触发操作返回适当的响应,之后点击右下角保存按钮完成函数的修改操作。

# -*- coding: utf-8 -*-
import base64
import json
def handler(event, context): 
	for record in event['Records']:
    	# kafka value is base64 encoded so decode here
    	payload = base64.b64decode(record['Kafka']['Value'])
		print("Decoded payload: " + payload)
	return 'Successfully processed {} records.'.format(len(event['Records']))

注:百度消息服务触发器的event消息体

{
  "Records": [
    {
      "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
      "Kafka": {
        "Key": "",          //base64后的key
        "Value": "MA==",    //base64后的value
        "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
        "Partition": 0,
        "Offset": 43110,
        "Timestamp": 1553151704.6529999
      },
      "EventName": "bce:kafka:record",
      "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
      "EventSource": "bce:kafka"
    },
    {
      "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
      "Kafka": {
        "Key": "",
        "Value": "MQ==",
        "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
        "Partition": 0,
        "Offset": 43111,
        "Timestamp": 1553151704.6529999
      },
      "EventName": "bce:kafka:record",
      "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
      "EventSource": "bce:kafka"
    }
  ]
}

配置百度消息服务触发器

  1. 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面
  2. 点击需要添加百度消息服务触发器的函数名称(即 kafkatrigger),进入函数详情页面。
  3. 点击左侧导航栏中的“触发器”,进入函数配置页面。 图片
  4. 在函数配置页面中最下方点击“新增触发器”。 图片
  5. 在弹出框中,点击下拉框“选择事件源进行添加”,选择百度消息服务触发器。 图片
  6. 之后在弹出框中配置好选项,并点击确认,完成触发器的创建。 - Topic 选择您将要监听的百度消息服务 Topic - 批处理大小:从 Topic 中一次读取的最大记录数,1-1000 - 起始位置:在 Topic 中开始读取的位置,最新记录对应 kafka 的 OffsetNewest,最老记录对应 kafka 的 OffsetOldest - 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试 图片
  7. 在函数配置页面“触发器”一栏中,可以看到刚刚创建好的百度消息服务触发器及其信息 图片

测试触发器

模拟测试

  1. 点击测试按钮 图片
  2. 输入测试事件,并点击执行

    测试内容如下

    	{
      "Records": [
        {
          "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
          "Kafka": {
            "Key": "",
            "Value": "MA==",
            "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "Partition": 0,
            "Offset": 43110,
            "Timestamp": 1553151704.6529999
          },
          "EventName": "bce:kafka:record",
          "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
          "EventSource": "bce:kafka"
        },
        {
          "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
          "Kafka": {
            "Key": "",
            "Value": "MQ==",
            "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "Partition": 0,
            "Offset": 43111,
            "Timestamp": 1553151704.6529999
          },
          "EventName": "bce:kafka:record",
          "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
          "EventSource": "bce:kafka"
        }
      ]
    }

图片 3. 查看返回的执行结果 图片

真实测试

在添加好触发器之后,需要设置日志存储,之后向 kafka topic 中发送消息,在日志中可以看到执行结果。

图片

并且在触发器的界面您也可以看到最后的执行结果

图片

上一篇
DuEdge触发器
下一篇
CDN触发器使用示例