函数计算CFC

    百度消息服务触发器

    百度消息服务触发器说明

    百度消息服务是全兼容 Apache Kafka 的分布式、高可扩展、高通量的消息托管服务。

    如果在百度消息服务的 Topic 上检测到记录,您可以使用 CFC 函数从此 Topic 读取成批的数据来处理,并且 CFC 会定期轮询(每秒一次)Topic 中的新纪录。

    百度消息服务触发器创建

    用户可以为新建的函数或已有函数配置百度消息服务触发器,创建函数的流程可以具体参考 从头创建函数,这里不再赘述。

    这里假设您已经创建了一个名为 kafkatrigger 的函数,以下内容以此为前提,将引导您在 CFC 控制台在函数管理页面中为函数配置百度消息服务触发器。接下来,我们将通过以下步骤来完成一个触发器的设置。

    编写处理函数

    1. 登录管理控制台,选择“产品服务>云函数计算 CFC”,进入“函数列表”页面
    2. 在“函数列表”页面。点击名称为 kafkatrigger 的函数,进入函数详情页面。

    在函数详情页中编写百度消息服务触发器对应的 handler,以对触发操作返回适当的响应,之后点击右下角保存按钮完成函数的修改操作。

    # -*- coding: utf-8 -*-
    import base64
    import json
    def handler(event, context): 
    	for record in event['Records']:
        	# kafka value is base64 encoded so decode here
        	payload = base64.b64decode(record['Kafka']['Value'])
    		print("Decoded payload: " + payload)
    	return 'Successfully processed {} records.'.format(len(event['Records']))

    注:百度消息服务触发器的event消息体

    {
      "Records": [
        {
          "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
          "Kafka": {
            "Key": "",          //base64后的key
            "Value": "MA==",    //base64后的value
            "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "Partition": 0,
            "Offset": 43110,
            "Timestamp": 1553151704.6529999
          },
          "EventName": "bce:kafka:record",
          "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
          "EventSource": "bce:kafka"
        },
        {
          "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
          "Kafka": {
            "Key": "",
            "Value": "MQ==",
            "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "Partition": 0,
            "Offset": 43111,
            "Timestamp": 1553151704.6529999
          },
          "EventName": "bce:kafka:record",
          "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
          "EventSource": "bce:kafka"
        }
      ]
    }

    配置百度消息服务触发器

    1. 登录管理控制台,选择“产品服务> 函数计算 CFC”,进入“函数列表”页面
    2. 点击需要添加百度消息服务触发器的函数名称(即 kafkatrigger),进入函数详情页面。
    3. 点击左侧导航栏中的“触发器”,进入函数配置页面。 图片
    4. 在函数配置页面中最下方点击“新增触发器”。 图片
    5. 在弹出框中,点击下拉框“选择事件源进行添加”,选择百度消息服务触发器。 图片
    6. 之后在弹出框中配置好选项,并点击确认,完成触发器的创建。 - Topic 选择您将要监听的百度消息服务 Topic - 批处理大小:从 Topic 中一次读取的最大记录数,1-1000 - 起始位置:在 Topic 中开始读取的位置,最新记录对应 kafka 的 OffsetNewest,最老记录对应 kafka 的 OffsetOldest - 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试 图片
    7. 在函数配置页面“触发器”一栏中,可以看到刚刚创建好的百度消息服务触发器及其信息 图片

    测试触发器

    模拟测试

    1. 点击测试按钮 图片
    2. 输入测试事件,并点击执行

      测试内容如下

      	{
        "Records": [
          {
            "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110",
            "Kafka": {
              "Key": "",
              "Value": "MA==",
              "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
              "Partition": 0,
              "Offset": 43110,
              "Timestamp": 1553151704.6529999
            },
            "EventName": "bce:kafka:record",
            "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "EventSource": "bce:kafka"
          },
          {
            "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111",
            "Kafka": {
              "Key": "",
              "Value": "MQ==",
              "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
              "Partition": 0,
              "Offset": 43111,
              "Timestamp": 1553151704.6529999
            },
            "EventName": "bce:kafka:record",
            "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl",
            "EventSource": "bce:kafka"
          }
        ]
      }

    图片 3. 查看返回的执行结果 图片

    真实测试

    在添加好触发器之后,需要设置日志存储,之后向 kafka topic 中发送消息,在日志中可以看到执行结果。

    图片

    并且在触发器的界面您也可以看到最后的执行结果

    图片

    上一篇
    CDN触发器
    下一篇
    定时任务触发器