KAFKA
所有文档
menu

百度流式计算 BSC

KAFKA

产品详情

KAFKA DDL

CREATE TABLE kafka_table (
    `field01` STRING,
    `field02` BIGINT,
    `field03` FLOAT,
    `field04` BINARY,
    `field05` INT,
    `field06` TINYINT,
    `field07` BOOLEAN,
    `field08` DATA,
    `field09` DOUBLE,
    `field10` SMALLINT
) WITH (
    'connector.type' = 'KAFKA',
    'format.encode' = 'JSON',
    'connector.topic' = 'topic_name',
    'connector.properties.bootstrap.servers' = 'localhost:9092',
    'connector.properties.group.id' = 'test_group',
    'connector.properties.ssl.filename' = 'kafka-key.zip',
    'connector.read.startup.mode' = 'specific',
    'connector.read.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);
CREATE TABLE kafka_table (
    `field01` STRING,
    `field02` BIGINT,
    `field03` FLOAT,
    `field04` BINARY,
    `field05` INT,
    `field06` TINYINT,
    `field07` BOOLEAN,
    `field08` DATA,
    `field09` DOUBLE,
    `field10` SMALLINT
) WITH (
    'connector.type' = 'KAFKA',
    'connector.topic' = 'topic_name',
    'connector.properties.bootstrap.servers' = 'localhost:9092',
    'connector.properties.ssl.filename' = 'kafka-key.zip',
    'connector.read.starting-offsets' = '{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}',
    'connector.read.ending-offsets' = '{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}',
    'connector.read.fail-on-data-loss' = 'true',
    'connector.read.max-offsets-per-trigger' = '1000000'
);

KAFKA 参数设置

名称 简称 是否必填 用例 SPARK FLINK 说明
connector.type type Y KAFKA Y Y 服务类型
format.encode encode Y JSON / CSV Y Y 数据编码
connector.version version 0.11 KAFKA 集群版本
connector.topic topic Y topicName Y Y KAFKA TOPIC名称
connector.properties.bootstrap.servers bootstrap.servers Y localhost:9092 Y Y KAFKA SERVER地址
connector.properties.group.id group.id test_group Y KAFKA GROUP ID
connector.properties.security.protocol security.protocol Y SASL_PLAINTEXT/SASL_SSL/PLAINTEXT/SSL Y Y KAFKA协议
connector.properties.ssl.filename ssl.filename kafka-key.zip Y Y SSL协议的时候,指定KAFKA 证书名称
connector.properties.sasl.mechanism sasl.mechanism SCRAM-SHA-512 Y Y SASL协议的时候,指定sasl mechanism
connector.properties.sasl.jaas.config sasl.jaas.config 例如org.apache.kafka.common.security.scram.ScramLoginModule sufficient username="admin" password="password"; Y Y SASL协议的时候,指定jaas config文件内容
connector.read.startup.mode startup.mode earliest / latest / specific Y KAFKA 启动模式
connector.read.startup.specific-offsets startup.specific-offsets partition:0,offset:42;partition:1,offset:300 Y 指定 KAFKA 启动的 offset
connector.read.starting-offsets starting-offsets earliest / latest / {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} Y 指定 kafka 的启动位置
connector.read.ending-offsets ending-offsets {"topic1":{"0":50,"1":-1},"topic2":{"0":-1}} Y 指定 kafka 的结束位置
connector.read.fail-on-data-loss fail-on-data-loss true / false Y
connector.read.max-offsets-per-trigger max-offsets-per-trigger 1000 Y
上一篇
TSDB
下一篇
DML语句