查询操作
更新时间:2019-08-20
获取度量(Metric)
如下代码可以获取metric列表:
// 获取Metric
GetMetricsResponse response = tsdbClient.getMetrics();
// 打印结果
for(String metric : response.getMetrics()) {
System.out.println(metric);
}
获取Field
如下代码可以获取Field列表:
String METRIC = "metric";
// 获取Field
GetFieldsResponse response = tsdbClient.getFields(METRIC);
// 打印结果
for(Map.Entry<String, FieldInfo> entry : response.getFields().entrySet()) {
System.out.println(entry.getKey() + ":");
System.out.println("\t" + entry.getValue().getType());
}
获取标签(Tag)
如下代码可以获取标签(Tag)列表:
String METRIC = "cpu_idle"; // 设置需要获取tag的metric
// 获取标签
GetTagsResponse response = tsdbClient.getTags(METRIC);
// 打印结果
for(Map.Entry<String, List<String>> entry : response.getTags().entrySet()) {
System.out.println(entry.getKey() + ":");
for(String tagValue : entry.getValue()) {
System.out.println("\t" + tagValue);
}
}
查询数据点
如下代码可以查询数据点:
String metric = "wind";
String field = "direction";
// 构造查询对象
List<Query> queries = Arrays.asList(new Query() // 创建Query对象
.withMetric(metric) // 设置metric
.withField(field) // 设置查询的域名,不设置表示查询默认域
.withFilters(new Filters() // 创建Filters对象
.withRelativeStart("2 hours ago")) // 设置相对的开始时间,这里设置为2小时前
.withLimit(100) // 设置返回数据点数目限制
.addAggregator(new Aggregator() // 创建Aggregator对象
.withName(TsdbConstants.AGGREGATOR_NAME_SUM) // 设置聚合函数为Sum
.withSampling("1 second"))); // 设置采样
// 查询数据
QueryDatapointsResponse response = tsdbClient.queryDatapoints(queries);
// 打印结果
for (Result result : response.getResults()) {
System.out.println("Result:");
for (Group group : result.getGroups()) {
System.out.println("\tGroup:");
for (GroupInfo groupInfo : group.getGroupInfos()) {
System.out.println("\t\tGroupInfo:");
System.out.println("\t\t\tName:" + groupInfo.getName());
}
System.out.println("\t\tTimeAndValue:");
try {
for (Group.TimeAndValue timeAndValue : group.getTimeAndValueList()) {
if (timeAndValue.isDouble()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getDoubleValue() + "]");
} else if (timeAndValue.isLong()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getLongValue() + "]");
} else {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getStringValue() + "]");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
多域查询数据点
如下代码可以查询数据点:
tring metric = "wind";
String field1 = "direction";
String field2 = "speed";
String tag = "city";
// 构造查询对象
List<String> fields = Arrays.asList(field1, field2);
List<Query> queries = Arrays.asList(new Query() // 创建Query对象
.withMetric(metric) // 设置metric
.withFields(fields) // 设置查询的域名列表,不设置表示查询默认域,和field冲突
.withTags(Arrays.asList(tag)) // 设置查询的Tag列表,不设置表示不查询,
.withFilters(new Filters() // 创建Filters对象
.withRelativeStart("2 hours ago") // 设置相对的开始时间,这里设置为2小时前
.withRelativeEnd("1 second ago")) // 设置相对的结束时间,不设置则默认为到当前时间为止
.withLimit(100));
// 查询数据,返回结果的顺序和请求的field顺序相同
QueryDatapointsResponse response = tsdbClient.queryDatapoints(queries);
// 打印结果
for (Result result : response.getResults()) {
System.out.println("Result:");
for (Group group : result.getGroups()) {
System.out.println("\tGroup:");
for (GroupInfo groupInfo : group.getGroupInfos()) {
System.out.println("\t\tGroupInfo:");
System.out.println("\t\t\tName:" + groupInfo.getName());
}
System.out.println("\t\tTimeAndValues:");
try {
for (Group.TimeAndValue timeAndValue : group.getTimeAndValueList()) {
System.out.print("\t\t\t[" + timeAndValue.getTime());
for (int index = 0; index < fields.size(); index++) {
System.out.print(", ");
if (!timeAndValue.isNull(index)) {
if (timeAndValue.isDouble(index)) {
System.out.print(timeAndValue.getDoubleValue(index));
} else if (timeAndValue.isLong(index)) {
System.out.print(timeAndValue.getLongValue(index));
} else {
System.out.print(timeAndValue.getStringValue(index));
}
} else {
System.out.print("null");
}
}
System.out.println("]");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
查询数据点和对应的tags
如下代码可以查询数据点的同时返回对应的tags信息:
String metric = "wind";
String field = "direction";
String tagKey1 = "city";
String tagKey2 = "province"; // 构造查询对象
List<Query> queries = Arrays.asList(new Query() // 创建Query对象
.withMetric(metric) // 设置metric
.withField(field) // 设置查询的域名,不设置表示查询默认域
.withTags(Arrays.asList(tagKey1, tagKey2)) // 设置需要同时返回的tag key列表
.withFilters(new Filters() // 创建Filters对象
.withRelativeStart("2 hours ago")) // 设置相对的开始时间
.withLimit(100)); // 设置返回数据点数目限制
// 查询数据
QueryDatapointsResponse response = tsdbClient.queryDatapoints(queries);
// 打印结果
for (Result result : response.getResults()) {
System.out.println("Result:");
for (Group group : result.getGroups()) {
System.out.println("\tGroup:");
for (GroupInfo groupInfo : group.getGroupInfos()) {
System.out.println("\t\tGroupInfo:");
System.out.println("\t\t\tName:" + groupInfo.getName());
}
System.out.println("\t\tTimeAndValue:");
try {
for (Group.TimeAndValue timeAndValue : group.getTimeAndValueList()) {
// 打印的格式为 [时间,FIELD_VALUE,TAG_1_VALUE,TAG_2_VALUE]
if (timeAndValue.isDouble()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getDoubleValue(0) + "," + timeAndValue.getStringValue(1) + "," + timeAndValue.getStringValue(2) + "]");
} else if (timeAndValue.isLong()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getLongValue(0) + "," + timeAndValue.getStringValue(1) + "," + timeAndValue.getStringValue(2) + "]");
} else {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getStringValue(0) + "," + timeAndValue.getStringValue(1) + "," + timeAndValue.getStringValue(2) + "]");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
使用插值方式查询数据点
如下代码可以对数据点进行插值:
String metric = "wind";
String field = "direction";
// 构造查询对象
List<Query> queries = Arrays.asList(new Query() // 创建Query对象
.withMetric(metric) // 设置metric
.withField(field) // 设置查询的域名,不设置表示查询默认域
.withFilters(new Filters() // 创建Filters对象
.withRelativeStart("2 hours ago") // 设置相对的开始时间
.withRelativeEnd("1 second ago")) // 设置相对的结束时间,不设置则默认为到当前时间为止
.withFill(new Fill()
.withType(TsdbConstants.FILL_TYPE_LINEAR) // 设置插值类型,这里设置成线性插值
.withInterval("5 minutes") // 设置插值间隔
.withMaxWriteInterval("10 minutes"))); // 设置最大写入间隔
// 查询数据
QueryDatapointsResponse response = tsdbClient.queryDatapoints(queries);
// 打印结果
for (Result result : response.getResults()) {
System.out.println("Result:");
for (Group group : result.getGroups()) {
System.out.println("\tGroup:");
for (GroupInfo groupInfo : group.getGroupInfos()) {
System.out.println("\t\tGroupInfo:");
System.out.println("\t\t\tName:" + groupInfo.getName());
}
System.out.println("\t\tTimeAndValue:");
try {
for (Group.TimeAndValue timeAndValue : group.getTimeAndValueList()) {
if (timeAndValue.isDouble()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getDoubleValue() + "]");
} else if (timeAndValue.isLong()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getLongValue() + "]");
} else {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getStringValue() + "]");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
分页查询原始数据点
TSDB支持两种分页查询的方式,一种是marker + limit的方式,一种是offset + limit的方式。 marker + limit的方式通过marker能在索引层确定查询的起始位置,因此查询速度更快,查询成本消耗更少。 offset + limit的方式需要将查询执行完成后,才能找到结果集offset的位置,因此查询成本消耗相对较大。当offset值过大时,查询返回耗时可能变大。
如下代码为marker + limit的分页查询原始数据点:
String metric = "wind";
String field = "direction";
// 构造查询对象
Query query = new Query() // 创建Query对象
.withMetric(metric) // 设置metric
.withField(field) // 设置查询的域名,不设置表示查询默认域
.withFilters(new Filters() // 创建Filters对象
.withRelativeStart("1 week ago") // 设置相对的开始时间,这里设置为1周前
.withRelativeEnd("1 second ago")); // 设置相对的结束时间,不设置则默认为到当前时间为止
while (true) {
// 查询数据
QueryDatapointsResponse response = tsdbClient.queryDatapoints(Arrays.asList(query));
// 打印结果
Result result = response.getResults().get(0);
System.out.println("Result:");
for (Group group : result.getGroups()) {
System.out.println("\tGroup:");
for (GroupInfo groupInfo : group.getGroupInfos()) {
System.out.println("\t\tGroupInfo:");
System.out.println("\t\t\tName:" + groupInfo.getName());
}
System.out.println("\t\tTimeAndValue:");
try {
for (Group.TimeAndValue timeAndValue : group.getTimeAndValueList()) {
if (timeAndValue.isDouble()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getDoubleValue() + "]");
} else if (timeAndValue.isLong()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getLongValue() + "]");
} else {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getStringValue() + "]");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 如果没有下一页数据,则退出循环
if (!result.isTruncated()) {
break;
}
// 设置下一页数据的marker
query.setMarker(result.getNextMarker());
}
如下是offset + limit的分页查询原始数据点:
String metric = "wind";
String field = "direction";
int pageSize = 10;
Query query = new Query() // 创建Query对象
.withMetric(metric) // 设置metric
.withField(field) // 设置查询的域名,不设置表示查询默认域
.withFilters(new Filters() // 创建Filters对象
.withRelativeStart("1 week ago") // 设置相对的开始时间,这里设置为1周前
.withRelativeEnd("1 second ago")) // 设置相对的结束时间,不设置则默认为到当前时间为止
.withLimit(pageSize) // 设置每页大小为10
.withOffset(0); // 查询第1页的数据
QueryDatapointsResponse response = tsdbClient.queryDatapoints(Arrays.asList(query));
// 查询第2页的数据
query.setOffset(pageSize * 1);
response = tsdbClient.queryDatapoints(Arrays.asList(query));
// 查询第3页的数据
query.setOffset(pageSize * 2);
response = tsdbClient.queryDatapoints(Arrays.asList(query));
查询数据时使用标签过滤
示例代码如下:
String metric = "wind";
String field = "direction";
// 构造查询对象
List<Query> queries = Arrays.asList(new Query() // 创建Query对象
.withMetric(metric) // 设置metric
.withField(field) // 设置查询的域名,不设置表示查询默认域
.withFilters(new Filters() // 创建Filters对象
.withRelativeStart("2 hours ago") // 设置相对的开始时间,这里设置为2小时前
.withRelativeEnd("1 second ago") // 设置相对的结束时间,不设置则默认为到当前时间为止
.addTagFilter(new TagFilter() // 创建TagFilter对象
.withTag("tag1") // 设置要过滤的tagKey
.addNotIn("value2")))); // 设置不允许出现的tagValue。还可以设置允许出现的tagValue,或者设置like匹配。
// 查询数据
QueryDatapointsResponse response = tsdbClient.queryDatapoints(queries);
// 打印结果
for (Result result : response.getResults()) {
System.out.println("Result:");
for (Group group : result.getGroups()) {
System.out.println("\tGroup:");
System.out.println("\t\tTimeAndValue:");
try {
for (Group.TimeAndValue timeAndValue : group.getTimeAndValueList()) {
if (timeAndValue.isDouble()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getDoubleValue() + "]");
} else if (timeAndValue.isLong()) {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getLongValue() + "]");
} else {
System.out.println("\t\t\t[" + timeAndValue.getTime() + "," + timeAndValue.getStringValue() + "]");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Filters对象设置示例
// 使用绝对时间
Filters filters1 = new Filters()
.withAbsoluteStart(System.currentTimeMillis() - 5000)
.withAbsoluteEnd(System.currentTimeMillis() - 1000);
// 添加Tags:逐个添加
Filters filters2 = new Filters()
.withRelativeStart("5 minutes ago")
.withRelativeEnd("2 minutes ago")
.addTag("tagKey1", "tagValue1", "tagValue2")
.addTag("tagKey2", "tagValue1");
// 添加Tags:通过map添加
Map<String, List<String>> tags = new HashMap<String, List<String>>(); // 创建tags
tags.put("tagKey1", Arrays.asList("tagValue1", "tagValue2")); // 添加tag
tags.put("tagKey2", Arrays.asList("tagValue1")); // 添加tag
Filters filters3 = new Filters()
.withRelativeStart("5 minutes ago")
.withRelativeEnd("2 minutes ago")
.withTags(tags);
按值过滤的设置示例
// 对long型数据进行过滤,支持的比较符为 >,<,>=,<=,= 和 !=
Filters filters1 = new Filters()
.withAbsoluteStart(System.currentTimeMillis() - 5000)
.withAbsoluteEnd(System.currentTimeMillis() - 1000)
.withValue(ValueFilter.createValueFilter(ValueFilter.LESS_OR_EQUAL, 100L)); // 过滤条件为 "<= 100"
// 对double型数据进行过滤,支持的比较符为 >,<,>=,<=,= 和 !=
Filters filters1 = new Filters()
.withAbsoluteStart(System.currentTimeMillis() - 5000)
.withAbsoluteEnd(System.currentTimeMillis() - 1000)
.withValue(ValueFilter.createValueFilter(ValueFilter.GREATER, 99.9)); // 过滤条件为 "> 99.9"
// 对String型数据进行过滤,支持的比较符为 >,<,>=,<=,= 和 !=
Filters filters1 = new Filters()
.withAbsoluteStart(System.currentTimeMillis() - 5000)
.withAbsoluteEnd(System.currentTimeMillis() - 1000)
.withValue(ValueFilter.createValueFilter(ValueFilter.EQUAL, "stringvalue")); // 过滤条件为 "= 'stringvalue'"
// 将数据与标签tagKey1的值进行比较过滤,支持的比较符为 >,<,>=,<=,= 和 !=
Filters filters1 = new Filters()
.withAbsoluteStart(System.currentTimeMillis() - 5000)
.withAbsoluteEnd(System.currentTimeMillis() - 1000)
.withValue(ValueFilter.createValueFilterOfTag(ValueFilter.EQUAL, "tagKey1")); // 过滤条件为 "= tagKey1"
// 对所有多域数据使用统一value进行过滤,支持的比较符为 >,<,>=,<=,= 和 !=
Filters filters1 = new Filters()
.withAbsoluteStart(System.currentTimeMillis() - 5000)
.withAbsoluteEnd(System.currentTimeMillis() - 1000)
.withValue(ValueFilter.createValueFilter(ValueFilter.LESS_OR_EQUAL, 100L)); // 过滤条件为 "<= 100"
// 对多域数据使用不同的value进行过滤,支持的比较符为 >,<,>=,<=,= 和 !=
FieldFilter fieldFilter1 = new FieldFilter("field1", ValueFilter.createValueFilter("<", 100L));
FieldFilter fieldFilter2 = new FieldFilter("field2", ValueFilter.createValueFilter(">", 200L));
Filters filters1 = new Filters()
.withAbsoluteStart(System.currentTimeMillis() - 5000)
.withAbsoluteEnd(System.currentTimeMillis() - 1000)
.withFields(Arrays.asList(fieldFilter1, fieldFilter2)); // 过滤条件为 "field1< 100 && field2 > 200"
查询涉及的对象
查询涉及到的对象包括:Query对象,Filters对象,GroupBy对象和Aggregator对象,关于对象的具体介绍,请参看API参考中的[查询data point](TSDB/API参考/数据API接口说明.md#查询data point)。
SQL查询接口
v0.10.32版本的SDK中,支持了sql的查询。示例如下:
String METRIC = "wind";
String FIELD = "direction";
String sql = "select " + FIELD + " from " + METRIC;
// 查询数据
GetRowWithSqlResponse response = tsdbClient.getRowsWithSql(sql);
// 打印表头
for (Column column : response.getColumns()) {
System.out.print(column.getName() + "\t");
}
System.out.println();
// 打印数据行
for (List<Object> row : response.getRows()) {
for (Object item : row) {
System.out.print(item.toString() + "\t");
}
System.out.println();
}