Overview
所有文档

          百度流式计算 BSC

          Overview

          支持的 Connectors

          服务类型 SPARK FLINK
          Source Sink Source Sink
          BKAFKA Y Y Y Y
          BOS Y Y Y Y
          MQTT Y Y Y
          RDS Y Y Y Y
          ES Y Y
          PALO Y Y
          TSDB Y Y

          如何使用 Connector

          CREATE TABLE source_kafka_table (
              `field01` STRING,
              `field02` BIGINT,
              `field03` FLOAT,
              `field04` BINARY,
              `field05` INT,
              `field06` TINYINT,
              `field07` BOOLEAN,
              `field08` DATA,
              `field09` DOUBLE,
              `field10` SMALLINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
              'format.encode' = 'JSON',
              'connector.properties.bootstrap.servers' = 'kafka.gz.baidubce.com:9092',
              'connector.properties.ssl.filename' = 'kafka-key_gz.zip',
              'connector.properties.group.id' = 'bsc123',
              'connector.read.startup.mode' = 'earliest'
          );

          字段类型

          TYPE SPARK FLINK MAPPING
          TINYINT Y Y BTYE / TINYINT
          SMALLINT Y Y SHORT / SMALLINT
          INT Y Y INT / INTEGER
          BIGINT Y Y LONG / BIGINT
          FLOAT Y Y FLOAT
          DOUBLE Y Y DOUBLE
          STRING Y Y STRING / CHAR / VARCHAR
          BINARY Y Y BINARY / VARBINARY / BYTES
          BOOLEAN Y Y BOOLEAN / BOOL
          TIMESTAMP Y Y TIMESTAMP / SQL_TIMESTAMP
          DECIMAL Y Y DECIMAL
          DATE Y Y DATE / LOCAL_DATE
          TIME Y TIME / LOCAL_TIME
          ARRAY Y Y ARRAY
          MAP Y Y MAP
          ROW Y ROW
          STRUCT Y STRUCT

          时间属性

          注意:SPARK 仅仅支持指定 source table 中某一时间类型的列作为 watermark 处理窗口的时间

          属性如何支持 EVENTTIME 和 PROCTIME

          属性名称 说明 EVENTTIME 设置举例 PROCTIME 设置举例
          watermark.field 使用事件时间的字段作为时间提取 'field' ''
          watermark.threshold 时间窗口的允许最大延迟设置 '2 seconds',支持单位有:milliseconds,seconds,minutes,hours ''
          watermark.field.alias SQL正文中使用的时间别名 'alias' 'proctime'
          watermark.field.pattern 设置日期模式进行转换时间戳 'yyyy-MM-dd HH:mm:ss' ''
          watermark.field.timezone 设置日期模式进行转换时区 'Asia/Shanghai' ''

          EVENTTIME支持的时间字段类型及其对应参数设置

          字段类型 watermark.field.pattern watermark.field.timezone 说明
          BIGINT 's'、'ms'、'second'、'millisecond' '' 使用的字段为LONG,转化为毫秒
          STRING 'yyyy-MM-dd HH:mm:ss' 'Asia/Shanghai' 使用的字段能够通过指定的模式转换为日期
          TIMESTAMP '' '' 使用的字段,必须符合TZ格式:2018-05-20T00:08:00Z

          日期格式对照标配表

          pattern timezone
          yyyy-MM-dd'T'HH:mm:ss.SSS Asia/Shanghai
          yyyy-MM-dd'T'HH:mm:ss.SSS'Z' UTC
          yyyy-MM-dd'T'HH:mm:ss Asia/Shanghai
          yyyy-MM-dd'T'HH:mm:ss'Z' UTC
          yyyy-MM-dd HH:mm:ss.SSS Asia/Shanghai
          yyyy-MM-dd HH:mm:ss.SSS'Z' UTC
          yyyy-MM-dd HH:mm:ss Asia/Shanghai
          yyyy-MM-dd HH:mm:ss'Z' UTC

          带有EVENTTIME的 Connector

          使用 EVENTTIME 时,需要指定 source 表中某一列作为时间戳,并配置其 watermark 等时间属性参数。

          CREATE TABLE source_kafka_table (
              `field01` STRING,
              `field02` BIGINT,   -- 时间戳为 BIGINT, 如 1523525179,单位为秒
              `field03` FLOAT,
              `field04` BINARY,
              `field05` INT,
              `field06` TINYINT,
              `field07` BOOLEAN,
              `field08` DATA,
              `field09` DOUBLE,
              `field10` SMALLINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
              'connector.properties.group.id' = 'bsc123',
              'connector.read.startup.mode' = 'earliest',
              'watermark.field' = 'field02',
              'watermark.threshold' = '1 minutes',
              'watermark.field.pattern' = 's' -- 时间戳字段的单位默认为 ms
          );
          CREATE TABLE sink_kafka_table (
              `timestamp` BIGINT,
              `field01` STRING,
              `count` BIGINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
          );
          INSERT INTO
              sink_kafka_table
          SELECT
              TO_BIGINT(TUMBLE_START(`field02`, INTERVAL '1' MINUTE)) AS `timestamp`,
              `field01`,
              COUNT(`field05`)
          FROM
              source_kafka_table
          GROUP BY 
              TUMBLE(`field02`, INTERVAL '1' MINUTE),
              `field01`
          CREATE TABLE source_kafka_table (
              `field01` STRING,
              `field02` STRING,  -- 时间戳为 STRING, 如 2018-05-20 00:11:00
              `field03` FLOAT,
              `field04` BINARY,
              `field05` INT,
              `field06` TINYINT,
              `field07` BOOLEAN,
              `field08` DATA,
              `field09` DOUBLE,
              `field10` SMALLINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
              'connector.properties.group.id' = 'bsc123',
              'connector.read.startup.mode' = 'earliest',
              'watermark.field' = 'field02',
              'watermark.threshold' = '1 minutes''watermark.field.pattern' = 'yyyy-MM-dd HH:mm:ss',  -- 时间戳字段的数据格式
              'watermark.field.timezone' = 'Asia/Shanghai'
          );
          CREATE TABLE sink_kafka_table (
              `timestamp` TIMESTAMP,
              `field01` STRING,
              `count` BIGINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
          );
          INSERT INTO
              sink_kafka_table
          SELECT
              TUMBLE_START(`field02`, INTERVAL '1' MINUTE) AS `timestamp`,
              `field01`,
              COUNT(`field05`) AS `count`
          FROM
              source_kafka_table
          GROUP BY 
              TUMBLE(`field02`, INTERVAL '1' MINUTE),
              `field01`
          CREATE TABLE source_kafka_table (
              `field01` STRING,
              `field02` TIMESTAMP,  -- 时间戳为 TIMESTAMP, 即 2018-05-20T00:11:00Z 这种 TZ 格式
              `field03` FLOAT,
              `field04` BINARY,
              `field05` INT,
              `field06` TINYINT,
              `field07` BOOLEAN,
              `field08` DATA,
              `field09` DOUBLE,
              `field10` SMALLINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
              'connector.properties.group.id' = 'bsc123',
              'connector.read.startup.mode' = 'earliest',
              'watermark.field' = 'field02',
              'watermark.threshold' = '1 minutes'
          );
          CREATE TABLE sink_kafka_table (
              `timestamp` BIGINT,
              `field01` STRING,
              `count` BIGINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
          );
          INSERT INTO
              sink_kafka_table
          SELECT
              TO_BIGINT(TUMBLE_START(`field02`, INTERVAL '1' MINUTE)) AS `timestamp`,
              `field01`,
              COUNT(`field05`)
          FROM
              source_kafka_table
          GROUP BY 
              TUMBLE(`field02`, INTERVAL '1' MINUTE),
              `field01`
          CREATE TABLE source_kafka_table (
              `field01` STRING,
              `field02` TIMESTAMP,  -- 时间戳为 TIMESTAMP, 即 2018-05-20T00:11:00Z 这种 TZ 格式
              `field03` FLOAT,
              `field04` BINARY,
              `field05` INT,
              `field06` TINYINT,
              `field07` BOOLEAN,
              `field08` DATA,
              `field09` DOUBLE,
              `field10` SMALLINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
              'connector.properties.group.id' = 'bsc123',
              'connector.read.startup.mode' = 'earliest',
              'watermark.field' = 'field02',
              'watermark.threshold' = '1 minutes',
              'watermark.field.alias' = 'rowtime'
          );
          CREATE TABLE sink_kafka_table (
              `timestamp` BIGINT,
              `field01` STRING,
              `count` BIGINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
          );
          INSERT INTO
              sink_kafka_table
          SELECT
              TO_BIGINT(TUMBLE_START(`rowtime`, INTERVAL '1' MINUTE)) AS `timestamp`,
              `field01`,
              COUNT(`field05`)
          FROM
              source_kafka_table
          GROUP BY 
              TUMBLE(`rowtime`, INTERVAL '1' MINUTE),
              `field01`
          CREATE TABLE source_kafka_table (
              `field01` STRING,
              `field02` TIMESTAMP,  -- SPARK 支持窗口数据类型为 TIMESTAMP
              `field03` FLOAT,
              `field04` BINARY,
              `field05` INT,
              `field06` TINYINT,
              `field07` BOOLEAN,
              `field08` DATA,
              `field09` DOUBLE,
              `field10` SMALLINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
              'watermark.field' = 'field02',
              'watermark.threshold' = '10 seconds'
          );
          CREATE TABLE sink_kafka_table (
              `timestamp` TIMESTAMP,
              `field01` STRING,
              `count` BIGINT
          ) WITH (
              'connector.type' = 'BKAFKA',
              'format.encode' = 'JSON',
              'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
              'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
              'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
          );
          INSERT INTO
              sink_kafka_table
          SELECT
              window.start AS `timestamp`,
              `field01`,
              COUNT(`field05`) AS `count`
          FROM
              source_kafka_table
          GROUP BY 
              window(`field02`, "1 MINUTE"),
              `field01`

          带有 PROCTIME 的 Connector

          使用进程的处理时间作为时间戳,FLINK不需要指定 source 表中某一列,只需要加入 SET job.streamTimeType = 'PROCESSTIME' 语句即可。

          SET job.stream.timeType = 'PROCESSTIME'; -- 通过 SET 语句指定 Flink 使用 PROCTIME
          CREATE TABLE source_mqtt_table (
              `field01` STRING,
              `field02` BIGINT,
              `field03` FLOAT,
              `field04` BINARY,
              `field05` INT,
              `field06` TINYINT,
              `field07` BOOLEAN,
              `field08` DATA,
              `field09` DOUBLE,
              `field10` SMALLINT
          ) WITH (
              'connector.type' = 'MQTT',
              'format.encode' = 'JSON',
              'connector.url' = 'tcp://xxxxxx.mqtt.iot.gz.baidubce.com:1883',
              'connector.topic' = 'xxxx',
              'connector.username' = 'xxxxxxxxx/bsc_test',
              'connector.password' = 'xxxxxxxx',
              'connector.semantic' = 'at_least_once'
          );
          上一篇
          SET 语句
          下一篇
          Formats