所有文档

          规则引擎 Rule Engine

          数据源和目的地

          Creek支持丰富的数据源和目的地,方便与多种上下游对接。目前支持的connector包括如下几种,并且还在持续的增加中。

          MQTT数据源和目的地

          MQTT数据源

          MQTT数据源支持以MQTT协议订阅指定主题的消息,作为流式作业的输入。在source中的type值为:MQTT

          该数据源通过attr对象指定相关的参数:

          属性 说明
          host MQTT broker的地址,包括协议和端口,如tcp://creek.mqtt.iot.bj.baidubce.com:1883
          topic MQTT主题,支持通配符,例如:creek/input/#
          user [可选]MQTT连接用户名,如: myusername
          pass [可选]MQTT连接密码,如: mypass123
          clientId [可选]MQTT连接的clientid,默认creekmqtt_src<系统时间(毫秒)>
          qos [可选]MQTT订阅的服务质量,可能取值:0,1,2, 默认为1

          MQTT目的地

          计算结果发送到指定的MQTT主题。在sink中的type值为:MQTT

          该数据目的地通过attr对象指定相关的参数:

          属性 说明
          host MQTT broker的地址,包括协议和端口,如tcp://creek.mqtt.iot.bj.baidubce.com:1883
          topic MQTT主题,不支持通配符,例如:creek/output
          user [可选]MQTT连接用户名,如: myusername
          pass [可选]MQTT连接密码,如: mypass123
          clientId [可选]MQTT连接的clientid,默认creekmqtt_sink<系统时间(毫秒)>
          qos [可选]MQTT发布的服务质量,可能取值:0,1,2,默认为1
          retained [可选]MQTT发布的消息是否设置retained标志,默认为false

          KAFKA数据源和目的地

          KAFKA数据源

          从一个或者多个Kafka主题订阅消息,作为流式作业的输入。在source中的type值为:KAFKA

          该数据源通过attr对象指定相关的参数:

          属性 说明
          broker kafka broker的服务器地址列表,例如:127.0.0.1:9092
          topic 订阅的一个或者多个主题,多种主题通过逗号分隔。例如:topic1,topic2
          version Kafka Broker的版本号,字符串类型。例如:1.0.2
          auto.offset.reset [可选]当group没有提交过offset,系统从主题的当前位置,还是最早的位置开始消费。可选earliest和largest,默认为largest

          示例

          "attr": {
          	"version": "1.0.2",
          	"topic": "topic1,topic2",
          	"auto.offset.reset": "earliest",
          	"broker": "127.0.0.1:9092"
          }

          KAFKA目的地

          计算结果输出到指定的Kafka主题。在sink中的type值为:KAFKA

          该数据目的地通过attr对象指定相关的参数:

          属性 说明
          broker kafka broker的服务器地址列表,例如:127.0.0.1:9092
          topic 目的kafka主题,例如:topic1,数据均衡的发送到各个partition

          FILE数据源和目的地

          FILE数据源

          从一个文本文件中,逐行读入数据,作为流式作业的输入,一行对应一条记录,如果tail为false,读取到文件末尾,则退出作业。在source中的type值为:FILE。

          该数据源通过attr对象指定相关的参数:

          属性 说明
          input 输入文件的路径
          tail [可选]是否跟踪文件的变化,如果设置为true,则会持续监控文件的更新,包括文件的rotate,永不结束。默认为false

          FILE目的地

          计算结果输出到指定的文件,每条记录以换行符分隔,文件不存在会自动创建,总是以APPEND方式写,在故障恢复期间,可能会截断文件,以实现exactly once的处理。在sink中的type值为:FILE。

          该数据目的地通过attr对象指定相关的参数:

          属性 说明
          output 目标文件的路径

          标准输入和输出

          标准输入STDIN

          为了方便调试和测试,可以使用STDIN的输入类型,作业启动后从命令行读入数据,一行一条,当遇到EOF(linux和mac:ctrl+d, windows: ctrl+z)时,作业有序退出。在source中的type值为:STDIN

          该数据源类型没有其他参数。

          标准输出STDOUT

          为了方便调试和测试,可以使用STDOUT的输出类型,作业的计算结果打印到标准屏幕,每条以换行符分隔。在sink中的type值为:STDOUT

          该数据目的地没有其他参数。

          固定输入COLLECTION

          为了方便调试和测试,当需要给定固定的输入数据集时,可以使用COLLECTION类型的source。作业运行起来从source的input数组中读取数据,处理完成后,退出作业。在source中的type值为:COLLECTION

          该数据源通过attr对象指定相关的参数:

          属性 说明
          input 应该为字符串数组

          示例

          "attr": {
                      "input": [
                          "2019-11-25 11:50:13.174,device01,23.5",
                          "2019-11-25 11:51:14.174,device02,24.9"
                      ]
                  }
          上一篇
          作业定义
          下一篇
          运行时设置