TSDB
所有文档

          百度流式计算 BSC

          TSDB

          TSDB DDL

          CREATE TABLE sink_tsdb_table (
               `datapoints` ARRAY < ROW(
                  `timestamp` BIGINT,
                  `metric` STRING,
                  `value` BIGINT,
                  `tags` MAP < STRING, STRING >
              ) >
          ) WITH (
              'connector.type' = 'TSDB',
              'connector.emit' = 'batch',
              'connector.url' = 'http://xxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
              'connector.net.max-connection-total' = '10',
              'connector.net.max-connection-per-route' = '10',
              'connector.net.connection-timeout-ms' = '60000',
              'connector.write.max-message-num-per-batch' = '1000'
          );
          CREATE TABLE TsdbTable (
              `field` STRING
          ) WITH (
              'connector.type' = 'TSDB',
              'connector.emit' = 'batch',
              'connector.url' = 'http://xxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
              'connector.net.max-connection-total' = '10',
              'connector.net.max-connection-per-route' = '10',
              'connector.net.connection-timeout-ms' = '60000',
              'connector.write.max-message-num-per-batch' = '1000',
              'connector.write.max-message-size-per-batch' = '1024'
          );

          TSDB 参数设置

          名称 简称 是否必填 用例 SPARK FLINK 说明
          connector.type type Y TSDB Y Y 服务类型
          connector.emit emit BATCH / STREAM, 默认 STREAM Y Y 数据输出形式
          connector.url url Y http://xxxxxxx.tsdb.iot.gz.baidubce.com Y Y TSDB 服务地址
          connector.net.max-connection-total max-connection-total 10 Y Y TSDB 最大链接总数
          connector.net.max-connection-per-route max-connection-per-route 10 Y Y TSDB 每个路由最大的链接数
          connector.net.connection-timeout-ms connection-timeout-ms 60000 Y Y TSDB 读取消息超时时间(单位毫秒)
          connector.write.max-message-num-per-batch max-message-num-per-batch 1000 Y Y TSDB 写入消息每次最大条数(仅emit=BATCH时有效)
          connector.write.max-message-size-per-batch max-message-size-per-batch 1024 Y TSDB 写入消息每次最大字节数(仅emit=BATCH时有效)

          TSDB 示例说明

          TSDB接收固定格式的数据点,其结构为:

          参数名称 参数类型 是否必须 说明
          datapoints List 必须 datapoint列表,由Datapoint对象组成的数组

          Datapoint对象

          参数名称 参数类型 是否必须 说明
          metric String 必须 metric的名称
          field String field的名称,默认名称为value。不同的field支持不同的数据类型写入。对于同一个field,如果写入了某个数据类型的value之后,相同的field不允许写入其他数据类型
          tags Object 必须 data point对应的所有tag,Object中的一对key-value表示一个tag的key-value
          type String 目前支持Long/Double/String/Bytes。代表value字段的类型,如果不填会根据解析出来的类型为准。bytes是种特殊类型,表示value是经过base64编码后的String,TSDB存储时会反编码成byte数组存储
          timestamp int Unix时间戳,单位是毫秒;如果timestamp为空,value不为空,timestamp自动填入系统当前时间;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000;timestamp+value与values两者必须二选一
          value Int/Double/String data point的值,timestamp+value与values两者必须二选一。当写入的metric、field、tags、timestamp都相同时,后写入的value会覆盖先写入的value
          values List<List> 对于相同的metric+tags的data point,可以通过合并成一个values的List来减少payload,values是个二维数组,里面的一维必须是两个元素,第一个元素是timestamp,是unix时间戳,类型是Int,第二个元素是value,类型是Int/Double/String;如果timestamp的位数小于等于10位,将认为精度是秒,自动乘以1000

          TSDB 需要的数据点集合

          TSDB 接受的数据点集合可以认为是一个嵌套 json,SPARK 需要在 DML 中完成这个 JSON 的组装,并将其转换为 STRING。而 FLINK 在 DML 中仅完成 List 的组装,由 SINK 算子隐式转换成 JSON 字符串,因此FLINK sink table 有固定的书写格式。下面示例可以看出,TSDB 允许每个 datapoint 采用不同的格式,但在 BSC SQL 流式计算中,我们往往要求每个SQL语句中仅使用某一种格式。

          {
              "datapoints": [{
                  "metric": "cpu_idle",
                  "tags": {
                      "host": "server1",
                      "rack": "rack1"
                  },
                  "timestamp": 1465376157007,
                  "value": 51
              }, {
                  "metric": "cpu_idle",
                  "tags": {
                      "host": "server2",
                      "rack": "rack2"
                  },
                  "values": [
                      [1465376269769, 67],
                      [1465376325057, 60]
                  ]
              }]
          }

          具体的实现如下:

          FLINK sink table 中字段名必须为 datapoints,类型必须为 ARRAY,ARRAY 中每个 ROW 代表一个 datapoints,因此sql 处理后产生的datapoints 要求格式保持一致。

          SPARK sink table 对字段名称没有要求,因为 STRING 的内容就是一个TSDB 接受的 JSON 字符串,已经在 DML 中完成了描述

          CREATE TABLE sink_tsdb_table (
              `datapoints` ARRAY < ROW(
                  `timestamp` BIGINT,
                  `metric` STRING,
                  `value` BIGINT,
                  `tags` MAP < STRING, STRING >
              ) >
          ) WITH (
              'connector.type' = 'TSDB',
              'format.encode' = 'JSON',
              'connector.emit' = 'BATCH',
              'connector.url' = 'http://bscwgtest.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
              'connector.write.max-message-num-per-batch' = '2000'
          );
          
          INSERT INTO
              tsdb_sink
          SELECT
              ARRAY [
                  ROW(`timestamp`, `count_name` , `count`, `common_tags`),
                  ROW(`timestamp`, `traffic_name`, `traffic`, `common_tags`),
                  ROW(`timestamp`, `latency_name`, `latency`, `common_tags`)
              ]
          FROM
              (
                  SELECT
                      `timestamp`,
                      'count' AS `count_name`,
                      `count`,
                      'traffic' AS `traffic_name`,
                      `traffic`,
                      'latency' AS `latency_name`,
                      `latency`,
                      MAP ['apiUuid', `apiUuid`, 'groupUuid', `groupUuid`, 'status', `status`] AS `common_tags`
                  FROM
                      (
                          SELECT
                              TO_BIGINT(TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE)) AS `timestamp`,
                              COUNT(1) AS `count`,
                              SUM(contentLength) AS `traffic`,
                              SUM(latency) AS `latency`,
                              apiUuid AS `apiUuid`,
                              groupUuid AS `groupUuid`,
                              CASE
                                  WHEN status >= 200
                                  AND status < 300 THEN '2xx'
                                  WHEN status >= 300
                                  AND status < 200 THEN '3xx'
                                  WHEN status >= 400
                                  AND status < 500 THEN '4xx'
                                  WHEN status >= 500
                                  AND status < 600 THEN '5xx'
                                  ELSE 'oth'
                              END AS `status`
                          FROM
                              source_table
                          GROUP BY
                              TUMBLE(`timestamp`, INTERVAL '1' MINUTE),
                              apiUuid,
                              groupUuid,
                              status
                      )
              )
          CREATE TABLE sink_tsdb_table (
              stringtype STRING
          ) WITH (
              'connector.type' = 'TSDB',
              'format.encode' = 'JSON',
              'connector.emit' = 'BATCH',
              'connector.url' = 'http://bscwgtest.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
              'connector.write.max-message-num-per-batch' = '2000'
          );
          
          INSERT INTO
              sink_table_tsdb(integertype) 
          select
              to_json(
                  named_struct(
                      'datapoints',
                      array(
                          named_struct(
                              'metric', 'xxxx',
                              'timestamp', max(`timestampfield`),
                              'value', count(`integerfield`),
                              'tags', map('isRoot', 'true')
                          )
                      )
                  )
              )
          from
              source_table_kafka
          group by 
              `bytefield` ;
          上一篇
          PALO
          下一篇
          BKAFKA