KAFKA
更新时间:2023-12-07
KAFKA DDL
CREATE TABLE kafka_table (
`field01` STRING,
`field02` BIGINT,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'KAFKA',
'format.encode' = 'JSON',
'connector.topic' = 'topic_name',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'test_group',
'connector.properties.ssl.filename' = 'kafka-key.zip',
'connector.read.startup.mode' = 'specific',
'connector.read.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);
CREATE TABLE kafka_table (
`field01` STRING,
`field02` BIGINT,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'KAFKA',
'connector.topic' = 'topic_name',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.ssl.filename' = 'kafka-key.zip',
'connector.read.starting-offsets' = '{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}',
'connector.read.ending-offsets' = '{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}',
'connector.read.fail-on-data-loss' = 'true',
'connector.read.max-offsets-per-trigger' = '1000000'
);
KAFKA 参数设置
名称 | 简称 | 是否必填 | 用例 | SPARK | FLINK | 说明 |
---|---|---|---|---|---|---|
connector.type | type | Y | KAFKA |
Y | Y | 服务类型 |
format.encode | encode | Y | JSON / CSV |
Y | Y | 数据编码 |
connector.version | version | 0.11 |
KAFKA 集群版本 | |||
connector.topic | topic | Y | topicName |
Y | Y | KAFKA TOPIC名称 |
connector.properties.bootstrap.servers | bootstrap.servers | Y | localhost:9092 |
Y | Y | KAFKA SERVER地址 |
connector.properties.group.id | group.id | test_group |
Y | KAFKA GROUP ID | ||
connector.properties.security.protocol | security.protocol | Y | SASL_PLAINTEXT /SASL_SSL /PLAINTEXT /SSL |
Y | Y | KAFKA协议 |
connector.properties.ssl.filename | ssl.filename | kafka-key.zip |
Y | Y | SSL协议的时候,指定KAFKA 证书名称 | |
connector.properties.sasl.mechanism | sasl.mechanism | SCRAM-SHA-512 |
Y | Y | SASL协议的时候,指定sasl mechanism | |
connector.properties.sasl.jaas.config | sasl.jaas.config | 例如org.apache.kafka.common.security.scram.ScramLoginModule sufficient username="admin" password="password"; |
Y | Y | SASL协议的时候,指定jaas config文件内容 | |
connector.read.startup.mode | startup.mode | earliest / latest / specific |
Y | KAFKA 启动模式 | ||
connector.read.startup.specific-offsets | startup.specific-offsets | partition:0,offset:42;partition:1,offset:300 |
Y | 指定 KAFKA 启动的 offset | ||
connector.read.starting-offsets | starting-offsets | earliest / latest / {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} |
Y | 指定 kafka 的启动位置 | ||
connector.read.ending-offsets | ending-offsets | {"topic1":{"0":50,"1":-1},"topic2":{"0":-1}} |
Y | 指定 kafka 的结束位置 | ||
connector.read.fail-on-data-loss | fail-on-data-loss | true / false |
Y | |||
connector.read.max-offsets-per-trigger | max-offsets-per-trigger | 1000 |
Y |