MQTT
更新时间:2021-06-24
MQTT DDL
CREATE TABLE mqtt_table (
`field01` STRING,
`field02` BIGINT,
`field03` FLOAT,
`field04` BINARY,
`field05` INT,
`field06` TINYINT,
`field07` BOOLEAN,
`field08` DATA,
`field09` DOUBLE,
`field10` SMALLINT
) WITH (
'connector.type' = 'MQTT',
'format.encode' = 'JSON',
'connector.emit' = 'BATCH',
'connector.url' = 'tcp://xxxxx.mqtt.iot.gz.baidubce.com:1883',
'connector.topic' = 'mqtt-topic',
'connector.username' = 'xxxxx/yyyy',
'connector.password' = 'xxxxxxxxxx',
'connector.semantic' = 'at_least_once',
'connector.net.connection-timeout' = '60',
'connector.net.keep-alive-interval' = '30',
'connector.write.max-message-num-per-batch' = '1000',
'connector.write.max-message-size-per-batch' = '1000'
);
MQTT 参数设置
名称 | 简称 | 必填 | 用例 | SPARK | FLINK | 说明 |
---|---|---|---|---|---|---|
connector.type | type | Y | MQTT |
Y | Y | 服务类型 |
format.encode | encode | Y | JSON / CSV |
Y | Y | 数据编码 |
connector.emit | emit | BATCH / STREAM , 默认 STREAM |
Y | Y | 数据输出形式,当使用BATCH时,可以提高向下游写数据的吞吐量,但带来了recover时数据更多重复的风险 | |
connector.url | url | Y | tcp://xxxxxx.mqtt.iot.gz.baidubce.com:1883 |
Y | Y | MQTT BROKER 地址 |
connector.topic | topic | Y | mqtt-topic |
Y | Y | MQTT TOPIC 名称 |
connector.username | username | Y | xxxx/yyy |
Y | Y | MQTT 用户名称 |
connector.password | password | Y | xxxxxxxxxxxx |
Y | Y | MQTT 用户密码 |
connector.semantic | semantic | at_leaset_once / at_most_once , 默认 at_leaset_once |
Y | MQTT 支持语义 | ||
connector.net.connection-timeout | connection-timeout | 60 |
Y | MQTT 连接超时时间(单位秒) | ||
connector.net.keep-alive-interval | keep-alive-interval | 30 |
Y | MQTT 连接保活时长(单位秒) | ||
connector.write.max-message-num-per-batch | max-message-num-per-batch | 1000 |
Y | MQTT 写入消息每次最大条数(仅emit=BATCH时有效) | ||
connector.write.max-message-size-per-batch | max-message-size-per-batch | 1000 |
Y | MQTT 写入消息每次最大字节数(仅emit=BATCH时有效) |