TSDB
更新时间:2023-12-07
TSDB DDL
CREATE TABLE sink_tsdb_table (
`datapoints` ARRAY < ROW(
`timestamp` BIGINT,
`metric` STRING,
`value` BIGINT,
`tags` MAP < STRING, STRING >
) >
) WITH (
'connector.type' = 'TSDB',
'connector.emit' = 'batch',
'connector.url' = 'http://xxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
'connector.net.max-connection-total' = '10',
'connector.net.max-connection-per-route' = '10',
'connector.net.connection-timeout-ms' = '60000',
'connector.write.max-message-num-per-batch' = '1000'
);
CREATE TABLE TsdbTable (
`field` STRING
) WITH (
'connector.type' = 'TSDB',
'connector.emit' = 'batch',
'connector.url' = 'http://xxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
'connector.net.max-connection-total' = '10',
'connector.net.max-connection-per-route' = '10',
'connector.net.connection-timeout-ms' = '60000',
'connector.write.max-message-num-per-batch' = '1000',
'connector.write.max-message-size-per-batch' = '1024'
);
TSDB 参数设置
名称 | 简称 | 是否必填 | 用例 | SPARK | FLINK | 说明 |
---|---|---|---|---|---|---|
connector.type | type | Y | TSDB |
Y | Y | 服务类型 |
connector.emit | emit | BATCH / STREAM , 默认 STREAM |
Y | Y | 数据输出形式,当使用BATCH时,可以提高向下游写数据的吞吐量,但带来了recover时数据更多重复的风险 | |
connector.url | url | Y | http://xxxxxxx.tsdb.iot.gz.baidubce.com |
Y | Y | TSDB 服务地址 |
connector.ak | ak | Y | ak |
Y | Y | 用户永久ak |
connector.sk | sk | Y | sk |
Y | Y | 用户永久sk |
connector.net.max-connection-total | max-connection-total | 10 |
Y | Y | TSDB 最大链接总数 | |
connector.net.max-connection-per-route | max-connection-per-route | 10 |
Y | Y | TSDB 每个路由最大的链接数 | |
connector.net.connection-timeout-ms | connection-timeout-ms | 60000 |
Y | Y | TSDB 读取消息超时时间(单位毫秒) | |
connector.write.max-message-num-per-batch | max-message-num-per-batch | 1000 |
Y | Y | TSDB 写入消息每次最大条数(仅emit=BATCH时有效) | |
connector.write.max-message-size-per-batch | max-message-size-per-batch | 1024 |
Y | TSDB 写入消息每次最大字节数(仅emit=BATCH时有效) |
TSDB 示例说明
TSDB接收固定格式的数据点,其结构为:
参数名称 | 参数类型 | 是否必须 | 说明 |
---|---|---|---|
datapoints | List |
必须 | datapoint列表,由Datapoint对象组成的数组 |
Datapoint对象
参数名称 | 参数类型 | 是否必须 | 说明 |
---|---|---|---|
metric | String | 必须 | metric的名称 |
field | String | field的名称,默认名称为value。不同的field支持不同的数据类型写入。对于同一个field,如果写入了某个数据类型的value之后,相同的field不允许写入其他数据类型 | |
tags | Object | 必须 | data point对应的所有tag,Object中的一对key-value表示一个tag的key-value |
type | String | 目前支持Long/Double/String/Bytes。代表value字段的类型,如果不填会根据解析出来的类型为准。bytes是种特殊类型,表示value是经过base64编码后的String,TSDB存储时会反编码成byte数组存储 | |
timestamp | int | Unix时间戳,单位是毫秒;如果timestamp为空,value不为空,timestamp自动填入系统当前时间;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000;timestamp+value与values两者必须二选一 | |
value | Int/Double/String | data point的值,timestamp+value与values两者必须二选一。当写入的metric、field、tags、timestamp都相同时,后写入的value会覆盖先写入的value | |
values | List<List |
对于相同的metric+tags的data point,可以通过合并成一个values的List来减少payload,values是个二维数组,里面的一维必须是两个元素,第一个元素是timestamp,是unix时间戳,类型是Int,第二个元素是value,类型是Int/Double/String;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000 |
TSDB 需要的数据点集合
TSDB 接受的数据点集合可以认为是一个嵌套 json,SPARK 需要在 DML 中完成这个 JSON 的组装,并将其转换为 STRING。而 FLINK 在 DML 中仅完成 List
{
"datapoints": [{
"metric": "cpu_idle",
"tags": {
"host": "server1",
"rack": "rack1"
},
"timestamp": 1465376157007,
"value": 51
}, {
"metric": "cpu_idle",
"tags": {
"host": "server2",
"rack": "rack2"
},
"values": [
[1465376269769, 67],
[1465376325057, 60]
]
}]
}
FLINK sink table 中字段名必须为 datapoints,类型必须为 ARRAY,ARRAY 中每个 ROW 代表一个 datapoints,因此sql 处理后产生的datapoints 要求格式保持一致。
SPARK sink table 对字段名称没有要求,因为 STRING 的内容就是一个TSDB 接受的 JSON 字符串,已经在 DML 中完成了描述
CREATE TABLE sink_tsdb_table (
`datapoints` ARRAY < ROW(
`timestamp` BIGINT,
`metric` STRING,
`value` BIGINT,
`tags` MAP < STRING, STRING >
) >
) WITH (
'connector.type' = 'TSDB',
'format.encode' = 'JSON',
'connector.emit' = 'BATCH',
'connector.url' = 'http://bscwgtest.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
'connector.write.max-message-num-per-batch' = '2000'
);
INSERT INTO
tsdb_sink
SELECT
ARRAY [
ROW(`timestamp`, `count_name` , `count`, `common_tags`),
ROW(`timestamp`, `traffic_name`, `traffic`, `common_tags`),
ROW(`timestamp`, `latency_name`, `latency`, `common_tags`)
]
FROM
(
SELECT
`timestamp`,
'count' AS `count_name`,
`count`,
'traffic' AS `traffic_name`,
`traffic`,
'latency' AS `latency_name`,
`latency`,
MAP ['apiUuid', `apiUuid`, 'groupUuid', `groupUuid`, 'status', `status`] AS `common_tags`
FROM
(
SELECT
TO_BIGINT(TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE)) AS `timestamp`,
COUNT(1) AS `count`,
SUM(contentLength) AS `traffic`,
SUM(latency) AS `latency`,
apiUuid AS `apiUuid`,
groupUuid AS `groupUuid`,
CASE
WHEN status >= 200
AND status < 300 THEN '2xx'
WHEN status >= 300
AND status < 200 THEN '3xx'
WHEN status >= 400
AND status < 500 THEN '4xx'
WHEN status >= 500
AND status < 600 THEN '5xx'
ELSE 'oth'
END AS `status`
FROM
source_table
GROUP BY
TUMBLE(`timestamp`, INTERVAL '1' MINUTE),
apiUuid,
groupUuid,
status
)
)
CREATE TABLE sink_tsdb_table (
stringtype STRING
) WITH (
'connector.type' = 'TSDB',
'format.encode' = 'JSON',
'connector.emit' = 'BATCH',
'connector.url' = 'http://bscwgtest.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
'connector.write.max-message-num-per-batch' = '2000'
);
INSERT INTO
sink_table_tsdb(integertype)
select
to_json(
named_struct(
'datapoints',
array(
named_struct(
'metric', 'xxxx',
'timestamp', max(`timestampfield`),
'value', count(`integerfield`),
'tags', map('isRoot', 'true')
)
)
)
)
from
source_table_kafka
group by
`bytefield` ;