API 日志调用统计
更新时间:2023-12-07
概览
用户拥有多台服务器,托管了一些 API 调用服务,现在想统计 API 的调用情况,形成图表。
需求场景
所有机器的 API 调用日志通过 自定义日志采集程序 进行日志采集后推送到 百度消息服务(KAFKA)中作为流式计算 source , 在我们 BSC 中创建 FLINK_STREAM/SQL 类型的作业用于 API 日志的聚合统计,并实时将聚合结果写到 时序时空数据库(TSDB)当中,用户可以通过 TSDB 的可视化面板或者利用 数据可视化工具(如 Sugar BI)等调用 TSDB 的数据 API 完成数据展示。
方案概述
服务器 → 自定义日志采集程序 → KAFKA → BSC → TSDB → Sugar BI
配置步骤
一个完整的 Flink SQL 作业由 source 表、sink 表和 DML 语句构成。
定义 KAFKA Source 表
FLINK
1CREATE TABLE source_kafka_table (
2 `timestamp` BIGINT,
3 `status` INTEGER,
4 `contentLength` BIGINT,
5 `latency` BIGINT,
6 `groupUuid` STRING,
7 `apiUuid` STRING
8) WITH (
9 'connector.type' = 'KAFKA',
10 'format.encode' = 'JSON',
11 'connector.topic' = 'xxxxxxxxx__bsc-source',
12 'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
13 'connector.properties.ssl.filename' = 'kafka-key_bd.zip',
14 'connector.properties.group.id' = 'test_group',
15 'connector.read.startup.mode' = 'latest',
16 'watermark.field' = 'timestamp',
17 'watermark.threshold' = '1 minutes'
18);
定义 TSDB Sink 表
FLINK
1CREATE TABLE sink_tsdb_table (
2 `datapoints` ARRAY < ROW(
3 `timestamp` BIGINT,
4 `metric` STRING,
5 `value` BIGINT,
6 `tags` MAP < STRING,
7 STRING >
8 ) >
9) WITH (
10 'connector.type' = 'TSDB',
11 'format.encode' = 'JSON',
12 'connector.emit' = 'BATCH',
13 'connector.url' = 'http://xxxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
14 'connector.write.max-message-num-per-batch' = '2000'
15);
编写数据统计DML语句
统计每分钟按照 apiUuid、groupUuid、status 进行聚合的结果,每个 Query 产生3个 TSDB datapoints,并实时写入到 TSDB 中。这里通过嵌套子查询的方式来使SQL结构更加清晰。选取 timestamp
字段作为 Eventtime 的watermark,延迟设置为1分钟。聚合时采用滚动窗口,窗口大小为1分钟。
FLINK
1INSERT INTO
2 sink_tsdb_table
3SELECT
4 ARRAY [
5 ROW(`timestamp`, `count_name` , `count`, `common_tags`),
6 ROW(`timestamp`, `traffic_name`, `traffic`, `common_tags`),
7 ROW(`timestamp`, `latency_name`, `latency`, `common_tags`)
8 ]
9FROM
10 (
11 SELECT
12 `timestamp`,
13 'count' AS `count_name`,
14 `count`,
15 'traffic' AS `traffic_name`,
16 `traffic`,
17 'latency' AS `latency_name`,
18 `latency`,
19 MAP ['apiUuid', `apiUuid`, 'groupUuid', `groupUuid`, 'status', `status`] AS `common_tags`
20 FROM
21 (
22 SELECT
23 TO_BIGINT(TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE)) AS `timestamp`,
24 COUNT(1) AS `count`,
25 SUM(contentLength) AS `traffic`,
26 SUM(latency) AS `latency`,
27 `apiUuid` AS `apiUuid`,
28 `groupUuid` AS `groupUuid`,
29 `status` AS `status`
30 FROM
31 (
32 SELECT
33 `timestamp`,
34 `contentLength`,
35 `latency`,
36 `apiUuid`,
37 `groupUuid`,
38 CASE
39 WHEN status >= 200
40 AND status < 300 THEN '2xx'
41 WHEN status >= 300
42 AND status < 200 THEN '3xx'
43 WHEN status >= 400
44 AND status < 500 THEN '4xx'
45 WHEN status >= 500
46 AND status < 600 THEN '5xx'
47 ELSE 'oth'
48 END AS `status`
49 FROM
50 source_kafka_table
51 ) AS taba
52 GROUP BY
53 TUMBLE(`timestamp`, INTERVAL '1' MINUTE),
54 `apiUuid`,
55 `groupUuid`,
56 `status`
57 ) AS tabb
58 ) AS tabc