规则引擎Rule Engine

    自定义函数

    自定义函数可以实现Creek内置函数以外的计算逻辑。自定义函数以go语言源文件的形式提交,在作业定义中表现为一个文件名和一个经过gzip压缩后进行base64编码的字符串。

    一个作业可以提交多个源文件,每个源文件可以包含多个函数。在作业的SQL中可以直接调用同名(大小写敏感)的go函数。目前支持的函数包含以下三种:

    • 标量函数(Scalar Function)
    • 聚合函数(Aggreate Function)
    • 表函数(Table Function)

    其他限制:

    1. 源文件的package必须为udf。因此源文件的开头应为: package udf
    2. 只允许导入系统库,不支持第三方库的import
    3. 函数名、结构名和要求的方法名,需要为导出名,即首字母大写
    4. 多个源文件名不允许重复,亦不可以取名:pkginit.go或者systemudf.go

    为了与数据源中数据类型尽量保持一致,整数类型尽量用int64, 浮点数尽量用float64。返回值为了支持返回空值,通常以一个额外的error的返回类型来表示。error等于nil表示成功,否则表示返回的值为空。

    作业定义示例

    如果您手动编辑作业定义,你需要自己完成对源码的编码(也可以访问http://www.txtwizard.net/compression 进行编解码)。假如你需要增加的源文件信息如下:

    文件名: myudf.go

    文件的内容:

    package udf
    
    import(
    	"strings"
    )
    
    func RepeatString(str string, n int64) (string, error) {
    	sb := strings.Builder{}
    	for i := int64(0); i < n; i++ {
    		sb.WriteString(str)
    	}
    
    	return sb.String(), nil
    }

    作业定义应该为如下格式:

    {
    	"udfInfo": [{
    		"name": "myudf.go",
    		"content": "H4sIAAAAAAAA/0WOMQ7CMAxF5/gUVqdERRUDYgC6cAQYmEPrVhYlrZxkqnJ30lLEZPm//2RPtnnZnjC2HQC/p1GCBlX4IOx6X4AB6KJr8EYT2XBfY50pfhs7dMguHA8G9S8hkVEMzqD8E0/11vTVNfLQkswJVDcK8sJWV+/NOa8XdHmU5WJmtXoIB/pfNKASgBIKURxmviGTf+AB0gfscVsxygAAAA=="
    	}]
    ...
    }

    标量函数(Scalar Function)

    标量函数接收一条消息中的零个或者多个字段作为输入,输出一个值,如substring, sin, pow等。为了支持返回空值的情况,函数的返回值应该有两个类型:一个正常返回值类型,一个error.

    因此其具体要求为:

    1. 输入参数为0个,或者多个
    2. 返回参数为2个,一个正常返回类型,一个error类型
    3. 当函数正常返回时,error应该为nil;当函数返回空值时,error为非空

    示例

    RepeatString实现对一个字符串重复n次之后返回。接收一个字符串和一个整数n,返回string类型。函数的实现中返回的error为nil,表示函数正常返回了string。

    在SQL中,可以如下方式调用该函数:

    SELECT RepeatString(deviceid, cast(2 as bigint)) FROM ...

    源码具体实现如:

    // RepeatString is a scalar function that repeasts a string for n time.
    // str is the string to be repeated,
    // n is the time to repeat. If n <= 0, an empty string will be returned.
    // for input: ("Hi", 2), will get "HiHi".
    // To return a nil value, just return non-nil error.
    // SQL sample: SELECT RepeatString('Hi', cast(2 as bigint)).
    func RepeatString(str string, n int64) (string, error) {
    	sb := strings.Builder{}
    	for i := int64(0); i < n; i++ {
    		sb.WriteString(str)
    	}
    
    	return sb.String(), nil
    }

    聚合函数(Aggregate Function)

    聚合函数对一个窗口内所有消息的同一字段进行汇总,最后返回一个值,如max, min, sum, count等。

    在Creek中,一个聚合函数体现为一个实现了规定接口的struct。该结构支持初始化,数据累计,到最后产出结果。根据场景不同,还需要支持retract,剔除已经累计的数据,以及两个聚合状态的合并merge。

    具体需要实现的接口如下:

    1. Open(reserved interface{}) 用于初始化,reserved为保留字段
    2. CreateAccumulator() interface{},用于一个窗口状态的初始化,返回类型为任意你定义的状态,例如一个结构体的指针。为了确保状态能够被正确的在snapshot时持久化,应当在文件的init()函数中,向gob注册该结构体。如: gob.Register(&TopWordAccumulator{})
    3. Accumulate(acci interface{}, value interface{}),每当同一窗口中有消息到来,会调用该函数将新消息累计到窗口的状态中去。acci为之前CreateAccumulator返回的状态;value为待聚合的字段。函数的具体实现通常需要对acci和value进行类型的cast。
    4. GetValue(acci interface{}) ([ActualType], error),当一个窗口触发时,调用该函数获取到聚合后的值。acci为之前CreateAccumulator返回的状态;该函数应有两个返回类型,第一个为正常返回的数据类型,如int64, float64等,第二个为error,用来表示返回空值。error为nil表示正常返回,否则意味着返回了空值。
    5. Retract(acci interface{}, value interface{}),从窗口中移除一个已经accumulate过的数据,acci为CreateAccumulator返回的状态, value为待移除数据。通常在OVER窗口中需要,其他窗口如tumble, hop, session窗口无需具体实现,只留一个空实现即可。对于SUM聚合函数,该方法通常从累计值减去value;对于COUNT聚合函数,通常意味着计数器减一。
    6. Merge(acciA interface{}, acciB interface{}),将两个窗口的状态合并,例如当两个session窗口重叠后,需要对两个窗口的状态进行合并。acciA, acciB分别为两个窗口初始化时CreateAccumulator返回的状态,合并后的状态应该存入acciA。其他窗口,如tumble, hop, over等窗口无需真正实现该方法,只留一个空的实现即可。
    7. ResetAccumulator(acci interface{}),重置窗口状态。acci为窗口初始化时CreateAccumulator返回的状态。
    8. Close(),用于状态清理。在作业正常退出时调用。

    示例:

    聚合函数返回一个窗口中,出现最多的单词。由2个struct组成,TopWordAccumulator为窗口的状态;TopWordAggFunction为聚合函数本身。

    TopWordAccumulator:

    每个窗口初始化会创建一个,并且checkpoint时会将其通过gob持久化,recover时通过gob反序列化它,因此需要在源文件的init()函数中,通过gob进行注册。

    func init() {
        gob.Register(&TopWordAccumulator{})
    }

    其内部通过一个map记录每个单词出现的次数,同时提供单词的加入,移除,状态的合并,以及最终结果的获取。其具体实现如下:

    // TopWordAccumulator holds the state of TopWordAggFunction, will return the 
    // most popular word in a given window, by number of appearence.
    // WordCount stores the count of each word.
    type TopWordAccumulator struct {
        WordCount map[string]int64
    }
    
    func (o *TopWordAccumulator) Add(str string, cnt int64) {
        o.WordCount[str] += cnt
    }
    
    func (o *TopWordAccumulator) Remove(str string, cnt int64) {
        o.WordCount[str] -= cnt
        if o.WordCount[str] <= 0 {
            delete(o.WordCount, str)
        }
    }
    
    func (o *TopWordAccumulator) Merge(accb *TopWordAccumulator) {
        for k, v := range accb.WordCount {
            o.Add(k, v)
        }
    }
    
    func (o *TopWordAccumulator) Reset() {
        o.WordCount = make(map[string]int64)
    }
    
    // TopWord returns the word with largest count, return 
    // empty string if no element. You can also return a 
    // nil if state is empty.
    func (o *TopWordAccumulator) TopWord() string {
        maxCnt := int64(0)
        var topWordSoFar string
        for k, v := range o.WordCount {
            if v > maxCnt {
                maxCnt = v
                topWordSoFar = k
            }
        }
        return topWordSoFar
    }

    TopWordAggFunction:

    TopWordAggFunction为SQL中可以引用的函数名,接收一个字符串类型,聚合结果返回也是一个字符串类型。主要作为TopWordAccumulator状态的生命周期管理。

    SQL引用示例:

    SELECT TopWordAggFunction(aString) FROM mysrc GROUP BY …

    其具体实现如下:

    // TopWordAggFunction is the function name can be called from SQL.
    // it accumulate and calcuate the result, with the help
    // of TopWordAccumulator.
    // SQL sample: SELECT TopWordAggFunction('Hello').
    type TopWordAggFunction struct {
      
    }
      
    func (f TopWordAggFunction) Open(reserved interface{}) {
    	// nothing to init.
    }
      
    // CreateAccumulator create a new instance of *TopWordAccumulator, when
    // a new window created.
    func (f TopWordAggFunction) CreateAccumulator() interface{} {
    	return &TopWordAccumulator{WordCount: make(map[string]int64)}
    }
    
    // Accumulate add an element into the a window, parameter types should be interface{}
    // acci is a instance of TopWordAccumulator, create from CreateAccumulator before
    // value is the element to be accumulated, e.g. a string.
    func (f TopWordAggFunction) Accumulate(acci interface{}, value interface{}) {
    	acc := acci.(*TopWordAccumulator)
    	if (value != nil) {
    		val := value.(string)
    		acc.Add(val, 1)
    	}
    }
      
    // GetValue is invoked when a window is going to emit its content.
    // acci is a instance of TopWordAccumulator, create from CreateAccumulator before.
    // To return a nil value, just return a non-nil error.
    func (f TopWordAggFunction) GetValue(acci interface{}) (string, error)  {
    	acc := acci.(*TopWordAccumulator)
    	return acc.TopWord(), nil
    }
    
    // Retract remove a element from the window state. This is required by, e.g. Over window.
    // If your function only needed for Tumble, Hop, Session window, then no need to implement this.
    func (f TopWordAggFunction) Retract(acci interface{}, value interface{}) {
    	acc := acci.(*TopWordAccumulator)
    	if (value != nil) {
    		val := value.(string)
    		acc.Remove(val, 1)
    	}
    }
      
    // Merge is needed for session window, when two windows overlap, then Merge
    // is called to combine the window state.
    // acciA will be result state, so data is merge from acciB to acciA
    func (f TopWordAggFunction) Merge(acciA interface{}, acciB interface{}) {
    	a := acciA.(*TopWordAccumulator)
    	b := acciB.(*TopWordAccumulator)
    	a.Merge(b)
    }
      
      
    func (f TopWordAggFunction) ResetAccumulator(acci interface{}) {
    	acc := acci.(*TopWordAccumulator)
    	acc.Reset()
    }
      
    func (f TopWordAggFunction) Close() {
    	
    }

    表函数(Table Function)

    表函数接收一条消息中的一个或者多个字段,产出0条或者多条消息。

    表函数具体表现为一个实现了Eval和GetType两个接口的struct。

    Eval方法为具体的实现方法,运行时每条输入消息都会调用,返回类型为[]interface{},每一个元素表示一条输出。每一条输出包含一个或者多个值,也用一个[]interface{}表示。因此,接口规定的返回类型为[]interface{},实际返回的数据类型为[][]interface{}。

    GetType方法为编译期辅助系统判断Eval实际类型的函数,运行时不会被调用。该方法返回的类型,为Eval每一条返回中各个字段的具体类型。

    示例:

    Split函数对一个字符串进行split,返回每个分隔后的字符串以及它的长度。输入参数str为待分隔的字符串,sep为分隔符。因此函数签名为:

    Eval(str string, sep string) []interface{}

    由于系统无法从[]interface{}中推断具体返回的每条消息的类型,需要查看GetType方法的返回类型,因为返回的类型为一个字符串,和一个整数型长度,因此GetType的签名为:

    GetType() (string, int64)

    SQL引用参考:

    SELECT deviceid, word, length from mysrc , LATERAL TABLE(Split(deviceid, ',')) as T(word, length)

    函数的具体实现:

    // Split is the table function, this struct need to implement two
    // methods: Eval and GetType. 
    // Eval is the one actually be invoked during runtime, while GetType 
    // is just a help class that provid actual returned types of Eval.
    // SQL sample:  
    // SELECT msg, word, length from mysrc, LATERAL TABLE(Split(msg, ',')) as T(word, length).
    type Split struct {
    
    }
    
    // Eval is the method that a table faunction need to implement, and the
    // the return type MUST be []interface{}, where each element is a row,
    // again it is presented by an other []interface. So the actual returned type
    // should be [][]interface.
    func (o Split) Eval(str string, sep string) []interface{} {
    	parts := strings.Split(str, sep)
    	ret := make([]interface{}, len(parts))
    	for i, p := range parts {
    		elem := make([]interface{}, 2)
    		elem[0] = p
    		elem[1] = int64(len(p))
    		ret[i] = elem
    	}
    
    	return ret
    }
    
    // GetType is dummy method that never be called, it is used to
    // tell the compiler what actual types are returned by Eval method,
    // which is required to return a []interface, where compiler
    // could not tell the concrete type by Eval alone. 
    func (o Split) GetType() (string, int64) {
    	return "", 0
    }
    上一篇
    运行时设置
    下一篇
    API 参考