规则引擎Rule Engine

    作业定义

    一个Creek作业由一个JSON文档完整的描述。其定义了作业的输入、输出、时间类型、计算逻辑、自定义函数等等。下面为一个作业定义的示例:

    示例

    {
    	"sources": [{
    		"schema": {
    			"format": "CSV",
    			"fields": [{
    					"name": "ts",
    					"type": "SQL_TIMESTAMP"
    				},
    				{
    					"name": "deviceid",
    					"type": "STRING"
    				},
    				{
    					"name": "temp",
    					"type": "DOUBLE"
    				}
    			]
    		},
    		"watermark": 0,
    		"name": "mysrc",
    		"eventTime": "ts",
    		"type": "MQTT",
    		"attr": {
    			"host": "tcp://yuanyoujun.mqtt.iot.bj.baidubce.com:1883",
    			"topic": "creek/input/#",
    			"user": "<your mqtt user name here>",
    			"pass": "<your mqtt password here>"
    		}
    	}],
    	"sink": {
    		"schema": {
    			"format": "JSON"
    		},
    		"name": "mysink",
    		"type": "STDOUT"
    	},
    	"name": "demojob",
    	"timeType": "EVENTTIME",
    	"sql": "INSERT INTO mysink SELECT TUMBLE_START(rowtime, INTERVAL '10' SECOND) as ts, deviceid, RepeatString(deviceid, 2), AVG(temp) AS avg_temperature  FROM mysrc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND), deviceid",
    	"udfInfo": [{
    		"name": "myudf.go",
    		"content": "H4sIAAAAAAAA/21STWvcMBA9W79i8KGxieMNIfSwzR7aEkghPTS7JfQo22OvWlky0qhlCfvfO5JtSkp0sWfee/PxpM0GJtn+kgOCkSPC1+/7AzQIeej6XKwQB0JsNvDDBmilAWv0CdQ4WUfQOzuCP3nCEbRqnHSnCowFOirXXU3S0SnmxUwvRJZ7csoMPudfNK3tONgMtslFKUQfTAvKKCpKeBHAh5H6CQfFDVzx7mCnZ+u6j20bxqAlWfdyLsU5Trd78/hWaukg1iVlDXg5Thrf5qYdn3BCSfs0IygPEv4vQUdJ4CLNU8Jnbm8dMKhGrGMdzkY5HXElkI3OJiFhV0WSWSlRFgkzWsOXnrG7HVxXwH7jOLGLS5k/Suu5EAVnsEvdYnNlpkBbKPIHlVdwU1YzdUACTnEyMQ92UfLkRmn4LXXACn4GTytgrLmKEDpnXRLtvz0uzm1hf/94//nwyqfi4kFdVPw0PBU3ID00alCGyrKeL/QVNxozr1LF/Q29vy2hWDOpZ7z7zDew3S1MX38KSnfIty2ytGvEkra4Lj9weAeGP5eXUcnS+tkpwn8dS5HxI8mWBRlfIDaJNxXnv2ziM78IAwAA"
    	}]
    }

    各部分说明

    sources

    定义了输入数据源,可以有多个。一个数据源定义了数据来源、格式、schema、水印和名称。

    字段名称 说明
    shema 数据的格式,字段类型等,详见schema说明
    watermark 以秒为单位的watermark
    name 数据源的名称,在SQL字段中引用
    eventTime 事件时间字段名,schema中为消息时间戳的field的name,没有事件时间则留空
    type 数据源类型,可能取值:STDIN(标准输入), MQTT, FILE, KAFKA, COLLECTION(从attr.input读取)
    attr 可选字段,指定不数据源相关的参数,示例中用来指定MQTT连接的相关信息,详情参考数据源和目的地

    schema说明

    字段名称 说明
    format 消息的格式,可能取值: CSV, JSON,TXT_BY_GROK
    formatAttr 可选,指定格式相关参数,例如CSV的分隔符,TXT_BY_GROK的pattern等
    fields 数组,每个元素指定列名和类型,类型参考

    完整的schema定义参考这里

    sink

    指定了计算结果输出到哪里去,以及输出的格式等。

    字段名称 说明
    schema 仅需指定输出的格式,format字段可能取值:CSV, JSON
    name 名称,SQL中引用,通常以INSERT INTO sinkname方式引用
    type 输出的类型,可能取值: STDOUT(标准输出), MQTT, FILE, KAFKA
    attr 可选字段,用来指定数据目的地的其它属性,例如Kafka的broker, topic等信息,详情参考数据源和目的地

    name

    作业的名称。如果使用了Kafka数据源的话,亦作为默认的consumer group id。

    timeType

    作业的时间类型,可能取值:EVENTTIME(事件时间), PROCESSTIME(处理时间)

    sql

    作业的SQL定义,与Flink SQL完全兼容

    udfInfo

    自定义函数列表。每个元素指定name,为源代码文件名,唯一且不能取名为pkginit.go或者systemudf.go;content为go语言源代码文件经过gzip后在进行base64编码的文本。可以在TxtWizard网址进行在线编解码。

    一篇
    简介
    一篇
    数据源和目的地