TSDB
更新时间:2023-12-07
TSDB DDL
1CREATE TABLE sink_tsdb_table (
2 `datapoints` ARRAY < ROW(
3 `timestamp` BIGINT,
4 `metric` STRING,
5 `value` BIGINT,
6 `tags` MAP < STRING, STRING >
7 ) >
8) WITH (
9 'connector.type' = 'TSDB',
10 'connector.emit' = 'batch',
11 'connector.url' = 'http://xxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
12 'connector.net.max-connection-total' = '10',
13 'connector.net.max-connection-per-route' = '10',
14 'connector.net.connection-timeout-ms' = '60000',
15 'connector.write.max-message-num-per-batch' = '1000'
16);
1CREATE TABLE TsdbTable (
2 `field` STRING
3) WITH (
4 'connector.type' = 'TSDB',
5 'connector.emit' = 'batch',
6 'connector.url' = 'http://xxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
7 'connector.net.max-connection-total' = '10',
8 'connector.net.max-connection-per-route' = '10',
9 'connector.net.connection-timeout-ms' = '60000',
10 'connector.write.max-message-num-per-batch' = '1000',
11 'connector.write.max-message-size-per-batch' = '1024'
12);
TSDB 参数设置
名称 | 简称 | 是否必填 | 用例 | SPARK | FLINK | 说明 |
---|---|---|---|---|---|---|
connector.type | type | Y | TSDB |
Y | Y | 服务类型 |
connector.emit | emit | BATCH / STREAM , 默认 STREAM |
Y | Y | 数据输出形式,当使用BATCH时,可以提高向下游写数据的吞吐量,但带来了recover时数据更多重复的风险 | |
connector.url | url | Y | http://xxxxxxx.tsdb.iot.gz.baidubce.com |
Y | Y | TSDB 服务地址 |
connector.ak | ak | Y | ak |
Y | Y | 用户永久ak |
connector.sk | sk | Y | sk |
Y | Y | 用户永久sk |
connector.net.max-connection-total | max-connection-total | 10 |
Y | Y | TSDB 最大链接总数 | |
connector.net.max-connection-per-route | max-connection-per-route | 10 |
Y | Y | TSDB 每个路由最大的链接数 | |
connector.net.connection-timeout-ms | connection-timeout-ms | 60000 |
Y | Y | TSDB 读取消息超时时间(单位毫秒) | |
connector.write.max-message-num-per-batch | max-message-num-per-batch | 1000 |
Y | Y | TSDB 写入消息每次最大条数(仅emit=BATCH时有效) | |
connector.write.max-message-size-per-batch | max-message-size-per-batch | 1024 |
Y | TSDB 写入消息每次最大字节数(仅emit=BATCH时有效) |
TSDB 示例说明
TSDB接收固定格式的数据点,其结构为:
参数名称 | 参数类型 | 是否必须 | 说明 |
---|---|---|---|
datapoints | List |
必须 | datapoint列表,由Datapoint对象组成的数组 |
Datapoint对象
参数名称 | 参数类型 | 是否必须 | 说明 |
---|---|---|---|
metric | String | 必须 | metric的名称 |
field | String | field的名称,默认名称为value。不同的field支持不同的数据类型写入。对于同一个field,如果写入了某个数据类型的value之后,相同的field不允许写入其他数据类型 | |
tags | Object | 必须 | data point对应的所有tag,Object中的一对key-value表示一个tag的key-value |
type | String | 目前支持Long/Double/String/Bytes。代表value字段的类型,如果不填会根据解析出来的类型为准。bytes是种特殊类型,表示value是经过base64编码后的String,TSDB存储时会反编码成byte数组存储 | |
timestamp | int | Unix时间戳,单位是毫秒;如果timestamp为空,value不为空,timestamp自动填入系统当前时间;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000;timestamp+value与values两者必须二选一 | |
value | Int/Double/String | data point的值,timestamp+value与values两者必须二选一。当写入的metric、field、tags、timestamp都相同时,后写入的value会覆盖先写入的value | |
values | List<List |
对于相同的metric+tags的data point,可以通过合并成一个values的List来减少payload,values是个二维数组,里面的一维必须是两个元素,第一个元素是timestamp,是unix时间戳,类型是Int,第二个元素是value,类型是Int/Double/String;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000 |
TSDB 需要的数据点集合
TSDB 接受的数据点集合可以认为是一个嵌套 json,SPARK 需要在 DML 中完成这个 JSON 的组装,并将其转换为 STRING。而 FLINK 在 DML 中仅完成 List
FLINK sink table 中字段名必须为 datapoints,类型必须为 ARRAY,ARRAY 中每个 ROW 代表一个 datapoints,因此sql 处理后产生的datapoints 要求格式保持一致。
SPARK sink table 对字段名称没有要求,因为 STRING 的内容就是一个TSDB 接受的 JSON 字符串,已经在 DML 中完成了描述