CDN 日志提取中转(ETL)
所有文档

          百度流式计算 BSC

          CDN 日志提取中转(ETL)

          业务场景

          用户对 CDN 日志进行提取中转,属于 ETL 场景, 用于数据的实时清洗、归并和结构化。

          业务描述

          所有的 CDN 日志通过 flume 直接推送到 百度消息服务(BKAFKA)中作为流式计算 source , 在我们 BSC 中创建 SPARK_STREAM/SQL 类型的作业用于 CDN 日志的提取中转,并实时将结果写到 百度消息服务(BKAFKA)或 对象存储(BOS)当中,用户可以对 sink 端的 BKAFKA / BOS 进行进一步的处理。

          处理流程

          服务器 → BKAFKA → BSC → BKAFKA / BOS → 其他

          案例实现

          一个完整的 Spark SQL 作业由 source 表、sink 表和 DML 语句构成。

          定义 BKAFKA Source 表

          CREATE TABLE source_kafka_table (
              `prefix` STRING,
              `region` STRING,
              `userIdSrc` STRING,
              `clusterNameSrc` STRING,
              `transDurationSrc` DOUBLE,
              `srcDurationSrc` STRING,
              `ts` BIGINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'CSV',
              'format.attributes.field-delimiter' = ' ''connector.topic' = 'xxxxxxxxx__bsc-source',
              'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
              'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
          );

          定义 BKAFKA / BOS Sink 表

          CREATE TABLE sink_table (
              `timestamp` TIMESTAMP,
              `region` STRING,
              `userIdSrc` STRING,
              `clusterNameSrc` STRING
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'CSV',
              'format.attributes.field-delimiter' = ',''connector.topic' = 'xxxxxxxxx__bsc-source',
              'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
              'connector.properties.ssl.filename' = 'kafka-key_bd.zip'
          );
          CREATE TABLE sink_table (
              `timestamp` TIMESTAMP,
              `region` STRING,
              `userIdSrc` STRING,
              `clusterNameSrc` STRING
          ) WITH (
              'connector.type' = 'BOS',
              'format.encode' = 'JSON',
              'connector.path' = 'bos://asc-sandbox-su/bos-source/json/'
          );

          编写数据提取DML语句

          根据 prefix 对日志内容进行提取,并存放到下游的云服务中,为之后的其他处理做数据清洗。

          INSERT INTO
             sink_table outputmode append
          SELECT
             from_unixtime(`ts`/1000-(`ts`/1000)%60,'yyyy-MM-dd HH') AS `timestamp`,
             `region`,
             `userIdSrc`,
             `clusterNameSrc`
          FROM
             source_kafka_table
          WHERE
             prefix = 'xxxxxxxx';
          上一篇
          物设备报警情况实时统计
          下一篇
          CDN 接口日志聚合统计