Overview
支持的 Connectors
服务类型 |
SPARK |
FLINK |
Source |
Sink |
Source |
Sink |
KAFKA |
Y |
Y |
Y |
Y |
BOS |
Y |
Y |
Y |
Y |
MQTT |
Y |
|
Y |
Y |
RDS |
Y |
Y |
Y |
Y |
ES |
|
Y |
|
Y |
PALO |
|
Y |
Y |
Y |
TSDB |
|
Y |
|
Y |
如何使用 Connector
CREATE TABLE source_kafka_table (
`field01` STRING,
`field02` BIGINT,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'KAFKA',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
'format.encode' = 'JSON',
'connector.properties.bootstrap.servers' = 'kafka.gz.baidubce.com:9092',
'connector.properties.ssl.filename' = 'kafka-key_gz.zip',
'connector.properties.group.id' = 'bsc123',
'connector.read.startup.mode' = 'earliest'
);
字段类型
TYPE |
SPARK |
FLINK |
MAPPING |
TINYINT |
Y |
Y |
BTYE / TINYINT |
SMALLINT |
Y |
Y |
SHORT / SMALLINT |
INT |
Y |
Y |
INT / INTEGER |
BIGINT |
Y |
Y |
LONG / BIGINT |
FLOAT |
Y |
Y |
FLOAT |
DOUBLE |
Y |
Y |
DOUBLE |
STRING |
Y |
Y |
STRING / CHAR / VARCHAR |
BINARY |
Y |
Y |
BINARY / VARBINARY / BYTES |
BOOLEAN |
Y |
Y |
BOOLEAN / BOOL |
TIMESTAMP |
Y |
Y |
TIMESTAMP / SQL_TIMESTAMP |
DECIMAL |
Y |
Y |
DECIMAL |
DATE |
Y |
Y |
DATE / LOCAL_DATE |
TIME |
|
Y |
TIME / LOCAL_TIME |
ARRAY |
Y |
Y |
ARRAY |
MAP |
Y |
Y |
MAP |
ROW |
|
Y |
ROW |
STRUCT |
Y |
|
STRUCT |
时间属性
注意:SPARK 仅仅支持指定 source table 中某一时间类型的列作为 watermark 处理窗口的时间
属性如何支持 EVENTTIME 和 PROCTIME
属性名称 |
说明 |
EVENTTIME 设置举例 |
PROCTIME 设置举例 |
watermark.field |
使用事件时间的字段作为时间提取 |
'field' |
'' |
watermark.threshold |
时间窗口的允许最大延迟设置 |
'2 seconds',支持单位有:milliseconds ,seconds ,minutes ,hours |
'' |
watermark.field.alias |
SQL正文中使用的时间别名 |
'alias' |
'proctime' |
watermark.field.pattern |
设置日期模式进行转换时间戳 |
'yyyy-MM-dd HH:mm:ss' |
'' |
watermark.field.timezone |
设置日期模式进行转换时区 |
'Asia/Shanghai' |
'' |
EVENTTIME支持的时间字段类型及其对应参数设置
字段类型 |
watermark.field.pattern |
watermark.field.timezone |
说明 |
BIGINT |
's'、'ms'、'second'、'millisecond' |
'' |
使用的字段为LONG,转化为毫秒 |
STRING |
'yyyy-MM-dd HH:mm:ss' |
'Asia/Shanghai' |
使用的字段能够通过指定的模式转换为日期 |
TIMESTAMP |
'' |
'' |
使用的字段,必须符合TZ格式:2018-05-20T00:08:00Z |
日期格式对照标配表
pattern |
timezone |
yyyy-MM-dd'T'HH:mm:ss.SSS |
Asia/Shanghai |
yyyy-MM-dd'T'HH:mm:ss.SSS'Z' |
UTC |
yyyy-MM-dd'T'HH:mm:ss |
Asia/Shanghai |
yyyy-MM-dd'T'HH:mm:ss'Z' |
UTC |
yyyy-MM-dd HH:mm:ss.SSS |
Asia/Shanghai |
yyyy-MM-dd HH:mm:ss.SSS'Z' |
UTC |
yyyy-MM-dd HH:mm:ss |
Asia/Shanghai |
yyyy-MM-dd HH:mm:ss'Z' |
UTC |
带有EVENTTIME的 Connector
使用 EVENTTIME 时,需要指定 source 表中某一列作为时间戳,并配置其 watermark 等时间属性参数。
CREATE TABLE source_kafka_table (
`field01` STRING,
`field02` BIGINT,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
'connector.properties.group.id' = 'bsc123',
'connector.read.startup.mode' = 'earliest',
'watermark.field' = 'field02',
'watermark.threshold' = '1 minutes',
'watermark.field.pattern' = 's'
);
CREATE TABLE sink_kafka_table (
`timestamp` BIGINT,
`field01` STRING,
`count` BIGINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
);
INSERT INTO
sink_kafka_table
SELECT
TO_BIGINT(TUMBLE_START(`field02`, INTERVAL '1' MINUTE)) AS `timestamp`,
`field01`,
COUNT(`field05`)
FROM
source_kafka_table
GROUP BY
TUMBLE(`field02`, INTERVAL '1' MINUTE),
`field01`
CREATE TABLE source_kafka_table (
`field01` STRING,
`field02` STRING,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
'connector.properties.group.id' = 'bsc123',
'connector.read.startup.mode' = 'earliest',
'watermark.field' = 'field02',
'watermark.threshold' = '1 minutes',
'watermark.field.pattern' = 'yyyy-MM-dd HH:mm:ss',
'watermark.field.timezone' = 'Asia/Shanghai'
);
CREATE TABLE sink_kafka_table (
`timestamp` TIMESTAMP,
`field01` STRING,
`count` BIGINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
);
INSERT INTO
sink_kafka_table
SELECT
TUMBLE_START(`field02`, INTERVAL '1' MINUTE) AS `timestamp`,
`field01`,
COUNT(`field05`) AS `count`
FROM
source_kafka_table
GROUP BY
TUMBLE(`field02`, INTERVAL '1' MINUTE),
`field01`
CREATE TABLE source_kafka_table (
`field01` STRING,
`field02` TIMESTAMP,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'BKAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
'connector.properties.group.id' = 'bsc123',
'connector.read.startup.mode' = 'earliest',
'watermark.field' = 'field02',
'watermark.threshold' = '1 minutes'
);
CREATE TABLE sink_kafka_table (
`timestamp` BIGINT,
`field01` STRING,
`count` BIGINT
) WITH (
'connector.type' = 'BKAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
);
INSERT INTO
sink_kafka_table
SELECT
TO_BIGINT(TUMBLE_START(`field02`, INTERVAL '1' MINUTE)) AS `timestamp`,
`field01`,
COUNT(`field05`)
FROM
source_kafka_table
GROUP BY
TUMBLE(`field02`, INTERVAL '1' MINUTE),
`field01`
CREATE TABLE source_kafka_table (
`field01` STRING,
`field02` TIMESTAMP,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
'connector.properties.group.id' = 'bsc123',
'connector.read.startup.mode' = 'earliest',
'watermark.field' = 'field02',
'watermark.threshold' = '1 minutes',
'watermark.field.alias' = 'rowtime'
);
CREATE TABLE sink_kafka_table (
`timestamp` BIGINT,
`field01` STRING,
`count` BIGINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
);
INSERT INTO
sink_kafka_table
SELECT
TO_BIGINT(TUMBLE_START(`rowtime`, INTERVAL '1' MINUTE)) AS `timestamp`,
`field01`,
COUNT(`field05`)
FROM
source_kafka_table
GROUP BY
TUMBLE(`rowtime`, INTERVAL '1' MINUTE),
`field01`
CREATE TABLE source_kafka_table (
`field01` STRING,
`field02` TIMESTAMP,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
'watermark.field' = 'field02',
'watermark.threshold' = '10 seconds'
);
CREATE TABLE sink_kafka_table (
`timestamp` TIMESTAMP,
`field01` STRING,
`count` BIGINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
);
INSERT INTO
sink_kafka_table
SELECT
window.start AS `timestamp`,
`field01`,
COUNT(`field05`) AS `count`
FROM
source_kafka_table
GROUP BY
window(`field02`, "1 MINUTE"),
`field01`
带有 PROCTIME 的 Connector
使用进程的处理时间作为时间戳,FLINK不需要指定 source 表中某一列,只需要加入 SET job.streamTimeType = 'PROCESSTIME'
语句即可。
SET job.stream.timeType = 'PROCESSTIME';
CREATE TABLE source_mqtt_table (
`field01` STRING,
`field02` BIGINT,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'MQTT',
'format.encode' = 'JSON',
'connector.url' = 'tcp://xxxxxx.mqtt.iot.gz.baidubce.com:1883',
'connector.topic' = 'xxxx',
'connector.username' = 'xxxxxxxxx/bsc_test',
'connector.password' = 'xxxxxxxx',
'connector.semantic' = 'at_least_once'
);