CDN 接口日志聚合统计
所有文档

          百度流式计算 BSC

          CDN 接口日志聚合统计

          业务场景

          用户对 CDN 接口日志进行聚合统计。

          业务描述

          所有的 CDN 接口调用日志通过 flume 直接推送到 百度消息服务(BKAFKA)中作为流式计算 source , 在我们 BSC 中创建 SPARK_STREAM/SQL 类型的作业用于 CDN 接口调用日志的聚合统计,并实时将聚合结果写到 百度数据仓库(Palo)当中,用户可以利用 数据可视化工具(如 Suger)等调用 Palo 的 API 完成数据展示。

          处理流程

          服务器 → BKAFKA → BSC → Palo → Sugar

          案例实现

          一个完整的 Spark SQL 作业由 source 表、sink 表和 DML 语句构成。

          定义 BKAFKA Source 表

          CREATE TABLE source_kafka_table (
              `prefix` STRING,
              `region` STRING,
              `userIdSrc` STRING,
              `clusterNameSrc` STRING,
              `transDurationSrc` DOUBLE,
              `srcDurationSrc` STRING,
              `ts` BIGINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'CSV',
              'format.attributes.field-delimiter' = ' ', -- 分隔符为空格
              'connector.topic' = 'xxxxxxxxx__bsc-source',
              'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
              'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
          );

          定义 Palo Sink 表

          CREATE TABLE sink_palo_table (
              `field` STRING
          ) WITH (
              'connector.type' = 'PALO',
              'format.encode' = 'TXT',
              'format.attributes.field-delimiter' = ','  -- 默认分隔符为逗号
              'connector.cluster-id' = 'xxxxxxxx',
              'connector.username' = 'admin',
              'connector.password' = 'xxxxx',
              'connector.database' = 'mct',
              'connector.table' = 'mct_statistics',
              'connector.mysql-host' = 'xxxxxxxxx-xxxxxxx-fe.palo.bd.baidubce.com',
              'connector.mysql-port' = '9030',
              'connector.compute-node-host' = 'xxxxxxxxx-xxxxxxx-be.palo.bd.baidubce.com',
              'connector.compute-node-port' = '8040'
          );

          编写数据聚合DML语句

          按照某些值和指定的时间进行聚合,没有使用窗口,而是定义 5 分钟的微批触发时间来完成聚合,并且聚合状态要设置为 no state

          INSERT INTO
             sink_palo_table outputmode append
          SELECT
             format_string('%s,%d,%s,%s,%s,%d,%f,%f\n',
                 from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH'),
                 unix_timestamp(from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH:mm'),'yyyy-MM-dd HH:mm'),
                 `region`,
                 `userIdSrc`,
                 `clusterNameSrc`,
                 count(*),
                 sum(if(`srcDurationSrc` != 'null', cast(`srcDurationSrc` as double), 0)/(if(`transDurationSrc` != 0, `transDurationSrc`, 0.01))),
                 sum(`transDurationSrc`)
              )
          FROM
             source_kafka_table
          WHERE
             prefix = 'xxxxxxxx'
          GROUP BY
             `userIdSrc`,
             `clusterNameSrc`,
             `region`,
             from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH'),
             unix_timestamp(from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH:mm'),'yyyy-MM-dd HH:mm') aggregate no state;  -- 聚合过程设置为无状态
          上一篇
          CDN 日志提取中转(ETL)
          下一篇
          物联网设备实时监控预警