百度智能云

百度智能云百度智能云
最佳实践 > API 日志调用统计
API 日志调用统计
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 BKAFKA Source 表
  • 定义 TSDB Sink 表
  • 编写数据统计DML语句
  • 相关产品

API 日志调用统计

更新时间:

概览

用户拥有多台服务器,托管了一些 API 调用服务,现在想统计 API 的调用情况,形成图表。

需求场景

所有机器的 API 调用日志通过 自定义日志采集程序 进行日志采集后推送到 百度消息服务(BKAFKA)中作为流式计算 source , 在我们 BSC 中创建 FLINK_STREAM/SQL 类型的作业用于 API 日志的聚合统计,并实时将聚合结果写到 时序时空数据库(TSDB)当中,用户可以通过 TSDB 的可视化面板或者利用 数据可视化工具(如 Sugar BI)等调用 TSDB 的数据 API 完成数据展示。

方案概述

服务器 → 自定义日志采集程序 → BKAFKA → BSC → TSDB → Sugar BI

配置步骤

一个完整的 Flink SQL 作业由 source 表、sink 表和 DML 语句构成。

定义 BKAFKA Source 表

```SQL label=FLINK
CREATE TABLE source_kafka_table (
    `timestamp` BIGINT,
    `status` INTEGER,
    `contentLength` BIGINT,
    `latency` BIGINT,
    `groupUuid` STRING,
    `apiUuid` STRING
) WITH (
    'connector.type' = 'BKAFKA',
    'format.encode' = 'JSON',
    'connector.topic' = 'xxxxxxxxx__bsc-source',
    'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
    'connector.properties.ssl.filename' = 'kafka-key_bd.zip',
    'connector.properties.group.id' = 'test_group',
    'connector.read.startup.mode' = 'latest',
    'watermark.field' = 'timestamp',
    'watermark.threshold' = '1 minutes'
);
```

定义 TSDB Sink 表

```SQL label=FLINK
CREATE TABLE sink_tsdb_table (
    `datapoints` ARRAY < ROW(
        `timestamp` BIGINT,
        `metric` STRING,
        `value` BIGINT,
        `tags` MAP < STRING,
        STRING >
    ) >
) WITH (
    'connector.type' = 'TSDB',
    'format.encode' = 'JSON',
    'connector.emit' = 'BATCH',
    'connector.url' = 'http://xxxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
    'connector.write.max-message-num-per-batch' = '2000'
);
```

编写数据统计DML语句

统计每分钟按照 apiUuid、groupUuid、status 进行聚合的结果,每个 Query 产生3个 TSDB datapoints,并实时写入到 TSDB 中。这里通过嵌套子查询的方式来使SQL结构更加清晰。选取 timestamp 字段作为 Eventtime 的watermark,延迟设置为1分钟。聚合时采用滚动窗口,窗口大小为1分钟。

```SQL label=FLINK
INSERT INTO
    sink_tsdb_table
SELECT
    ARRAY [
        ROW(`timestamp`, `count_name` , `count`, `common_tags`),
        ROW(`timestamp`, `traffic_name`, `traffic`, `common_tags`),
        ROW(`timestamp`, `latency_name`, `latency`, `common_tags`)
    ]
FROM
    (
        SELECT
            `timestamp`,
            'count' AS `count_name`,
            `count`,
            'traffic' AS `traffic_name`,
            `traffic`,
            'latency' AS `latency_name`,
            `latency`,
            MAP ['apiUuid', `apiUuid`, 'groupUuid', `groupUuid`, 'status', `status`] AS `common_tags`
        FROM
            (
                SELECT
                    TO_BIGINT(TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE)) AS `timestamp`,
                    COUNT(1) AS `count`,
                    SUM(contentLength) AS `traffic`,
                    SUM(latency) AS `latency`,
                    `apiUuid` AS `apiUuid`,
                    `groupUuid` AS `groupUuid`,
                    `status` AS `status`
                FROM
                    (
                        SELECT
                            `timestamp`,
                            `contentLength`,
                            `latency`,
                            `apiUuid`,
                            `groupUuid`,
                            CASE
                                WHEN status >= 200
                                AND status < 300 THEN '2xx'
                                WHEN status >= 300
                                AND status < 200 THEN '3xx'
                                WHEN status >= 400
                                AND status < 500 THEN '4xx'
                                WHEN status >= 500
                                AND status < 600 THEN '5xx'
                                ELSE 'oth'
                            END AS `status`
                        FROM
                            source_kafka_table
                    ) AS taba
                GROUP BY
                    TUMBLE(`timestamp`, INTERVAL '1' MINUTE),
                    `apiUuid`,
                    `groupUuid`,
                    `status`
            ) AS tabb
    ) AS tabc
```

相关产品

消息服务 for Kafka时序时空数据库TSDB数据可视化Sugar BI