百度消息服务触发器
所有文档

          函数计算 CFC

          百度消息服务触发器

          百度消息服务触发器说明

          百度消息服务是全兼容 Apache Kafka 的分布式、高可扩展、高通量的消息托管服务。

          如果在百度消息服务的 Topic 上检测到记录,您可以使用 CFC 函数从此 Topic 读取成批的数据来处理,并且 CFC 会定期轮询(每秒一次)Topic 中的新纪录。

          百度消息服务触发器创建

          用户可以为新建的函数或已有函数配置百度消息服务触发器,创建函数的流程可以具体参考 从头创建函数,这里不再赘述。

          这里假设您已经创建了一个名为 kafkatrigger 的函数,以下内容以此为前提,将引导您在 CFC 控制台在函数管理页面中为函数配置百度消息服务触发器。接下来,我们将通过以下步骤来完成一个触发器的设置。

          编写处理函数

          1. 登录管理控制台,选择“产品服务>云函数计算 CFC”,进入“函数列表”页面
          2. 在“函数列表”页面。点击名称为 kafkatrigger 的函数,进入函数详情页面。

          在函数详情页中编写百度消息服务触发器对应的 handler,以对触发操作返回适当的响应,之后点击右下角保存按钮完成函数的修改操作。

          # -*- coding: utf-8 -*-
          import base64
          import json
          def handler(event, context): 
          	for record in event['Records']:
              	# kafka value is base64 encoded so decode here
              	payload = base64.b64decode(record['Kafka']['Value'])
          		print("Decoded payload: " + payload)
          	return 'Successfully processed {} records.'.format(len(event['Records']))

          注:百度消息服务触发器的event消息体

          {
            "Records": [
              {
                "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
                "Kafka": {
                  "Key": "",          //base64后的key
                  "Value": "MA==",    //base64后的value
                  "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                  "Partition": 0,
                  "Offset": 43110,
                  "Timestamp": 1553151704.6529999
                },
                "EventName": "bce:kafka:record",
                "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                "EventSource": "bce:kafka"
              },
              {
                "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
                "Kafka": {
                  "Key": "",
                  "Value": "MQ==",
                  "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                  "Partition": 0,
                  "Offset": 43111,
                  "Timestamp": 1553151704.6529999
                },
                "EventName": "bce:kafka:record",
                "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                "EventSource": "bce:kafka"
              }
            ]
          }

          配置百度消息服务触发器

          1. 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面
          2. 点击需要添加百度消息服务触发器的函数名称(即 kafkatrigger),进入函数详情页面。
          3. 点击左侧导航栏中的“触发器”,进入函数配置页面。 图片
          4. 在函数配置页面中最下方点击“新增触发器”。 图片
          5. 在弹出框中,点击下拉框“选择事件源进行添加”,选择百度消息服务触发器。 图片
          6. 之后在弹出框中配置好选项,并点击确认,完成触发器的创建。 - Topic 选择您将要监听的百度消息服务 Topic - 批处理大小:从 Topic 中一次读取的最大记录数,1-1000 - 起始位置:在 Topic 中开始读取的位置,最新记录对应 kafka 的 OffsetNewest,最老记录对应 kafka 的 OffsetOldest - 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试 图片
          7. 在函数配置页面“触发器”一栏中,可以看到刚刚创建好的百度消息服务触发器及其信息 图片

          测试触发器

          模拟测试

          1. 点击测试按钮 图片
          2. 输入测试事件,并点击执行

            测试内容如下

            	{
              "Records": [
                {
                  "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
                  "Kafka": {
                    "Key": "",
                    "Value": "MA==",
                    "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                    "Partition": 0,
                    "Offset": 43110,
                    "Timestamp": 1553151704.6529999
                  },
                  "EventName": "bce:kafka:record",
                  "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                  "EventSource": "bce:kafka"
                },
                {
                  "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
                  "Kafka": {
                    "Key": "",
                    "Value": "MQ==",
                    "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                    "Partition": 0,
                    "Offset": 43111,
                    "Timestamp": 1553151704.6529999
                  },
                  "EventName": "bce:kafka:record",
                  "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
                  "EventSource": "bce:kafka"
                }
              ]
            }

          图片 3. 查看返回的执行结果 图片

          真实测试

          在添加好触发器之后,需要设置日志存储,之后向 kafka topic 中发送消息,在日志中可以看到执行结果。

          图片

          并且在触发器的界面您也可以看到最后的执行结果

          图片

          上一篇
          CDN触发器
          下一篇
          定时任务触发器