数据源和目的地
数据源是流式计算的输入,数据目的地是流式计算的输出。目前支持如下几种数据源和目的地。
物接入(MQTT)
消息来自于或者发往物接入,需指定实例名和MQTT主题。
百度消息服务(Kafka)
消息来自或者发往百度消息服务,需指定Kafka主题。
时序数据库(TSDB)
作为目的地
计算结果直接写入指定的TSDB。时间戳(timestamp)、度量(metric)、值(value)还有标签(tag)需在结果消息里面指定。即输出的消息需包含如下几个字段:
- timestamp
- metric
- value
- 一个或者多个标签(tag)
下面是一个目的地为TSDB的SQL举例:
INSERT INTO my_tsdb_sink
SELECT
TUMBLE_START(rowtime, INTERVAL '5' MINUTE) AS `timestamp`,
deviceid,
AVG(temperature) AS `value`,
'devicetemperature' AS metric
FROM mysource
GROUP BY
TUMBLE(rowtime, INTERVAL '5' MINUTE),
deviceid
其中,timestamp, value均为SQL关键字,因此需要用重音符(`)引起来。deviceid为一个tag写进TSDB。
作为数据源
选择某个DB的某个度量(metric)的数据作为输入,并且可以处理已经存在的历史数据。系统会自动探测metric下所有的域和tag,并且自动生成各个字段。如果有遗漏的域或者tag,可以在定义数据源时,手动添加相关的字段。
对于名为x的域,添加的字段名应该为:fields.x,类型为相应的类型;对于名为y的tag,添加的字段名为tags.y,字段类型为STRING。
举例,假设下面是某个度量下面数据的定义:
属性名称 | 类型 |
---|---|
ts | SQL_TIMESTAMP |
fields.value | DOUBLE |
tags.device | STRING |
可以类似如下在SQL中直接引用各个域和tag字段:
insert into mysink
select
tumble_start(rowtime, INTERVAL '30' SECOND) as `timestamp`,
sum(`value`) as `value`,
device
from mysource
group by tumble(rowtime, INTERVAL '30' SECOND),
device
百度对象存储(BOS)(仅作为目的地)
计算结果写入到指定的BOS(百度对象存储)Bucket中去,每天的数据对应一个文件。具体的文件名为数据目的地的名字加上日期和一个数字下标,文件扩展名与格式一致;例如: yyj_bos_sink_2018-06-25_0.csv。
消息服务(SMS)(仅作为目的地)
计算结果直接以短信的方式发送,例如检测到异常后立即发送短信告警。该类型目的地,需要选择一个短信接收人,一个短信接收人绑定了发送短信的签名、模板、和一组手机号码。为了保证短信能正确发出,请确保流式任务输出的消息严格符合模板内容中变量的要求,即SQL语句中SELECT出的字段需要与短信模板中的变量一一对应。举个例子,假设短信接收人的短信模板内容如下:
物联网设备${devid}的温度值过高,最新值为${temp}
那么SQL语句中必须SELECT两个字段,不能多也不能少,并且分别通过AS语句命名成devid和temp。如下所示:
INSERT INTO my_sms_sink SELECT deviceid AS devid, AVG(temperature) AS temp FROM ...
函数计算(CFC)(仅作为目的地)
将计算结果作为输入,调用相关的函数计算(CFC),以便自定义对计算结果的处理。