自定义函数
所有文档

          规则引擎 Rule Engine

          自定义函数

          自定义函数可以实现Creek内置函数以外的计算逻辑。自定义函数以go语言源文件的形式提交,在作业定义中表现为一个文件名和一个经过gzip压缩后进行base64编码的字符串。

          一个作业可以提交多个源文件,每个源文件可以包含多个函数。在作业的SQL中可以直接调用同名(大小写敏感)的go函数。目前支持的函数包含以下三种:

          • 标量函数(Scalar Function)
          • 聚合函数(Aggreate Function)
          • 表函数(Table Function)

          其他限制:

          1. 源文件的package必须为udf。因此源文件的开头应为: package udf
          2. 只允许导入系统库,不支持第三方库的import
          3. 函数名、结构名和要求的方法名,需要为导出名,即首字母大写
          4. 多个源文件名不允许重复,亦不可以取名:pkginit.go或者systemudf.go

          为了与数据源中数据类型尽量保持一致,整数类型尽量用int64, 浮点数尽量用float64。返回值为了支持返回空值,通常以一个额外的error的返回类型来表示。error等于nil表示成功,否则表示返回的值为空。

          作业定义示例

          如果您手动编辑作业定义,你需要自己完成对源码的编码(也可以访问http://www.txtwizard.net/compression 进行编解码)。假如你需要增加的源文件信息如下:

          文件名: myudf.go

          文件的内容:

          package udf
          
          import(
          	"strings"
          )
          
          func RepeatString(str string, n int64) (string, error) {
          	sb := strings.Builder{}
          	for i := int64(0); i < n; i++ {
          		sb.WriteString(str)
          	}
          
          	return sb.String(), nil
          }

          作业定义应该为如下格式:

          {
          	"udfInfo": [{
          		"name": "myudf.go",
          		"content": "H4sIAAAAAAAA/0WOMQ7CMAxF5/gUVqdERRUDYgC6cAQYmEPrVhYlrZxkqnJ30lLEZPm//2RPtnnZnjC2HQC/p1GCBlX4IOx6X4AB6KJr8EYT2XBfY50pfhs7dMguHA8G9S8hkVEMzqD8E0/11vTVNfLQkswJVDcK8sJWV+/NOa8XdHmU5WJmtXoIB/pfNKASgBIKURxmviGTf+AB0gfscVsxygAAAA=="
          	}]
          ...
          }

          标量函数(Scalar Function)

          标量函数接收一条消息中的零个或者多个字段作为输入,输出一个值,如substring, sin, pow等。为了支持返回空值的情况,函数的返回值应该有两个类型:一个正常返回值类型,一个error.

          因此其具体要求为:

          1. 输入参数为0个,或者多个
          2. 返回参数为2个,一个正常返回类型,一个error类型
          3. 当函数正常返回时,error应该为nil;当函数返回空值时,error为非空

          示例

          RepeatString实现对一个字符串重复n次之后返回。接收一个字符串和一个整数n,返回string类型。函数的实现中返回的error为nil,表示函数正常返回了string。

          在SQL中,可以如下方式调用该函数:

          SELECT RepeatString(deviceid, cast(2 as bigint)) FROM ...

          源码具体实现如:

          // RepeatString is a scalar function that repeasts a string for n time.
          // str is the string to be repeated,
          // n is the time to repeat. If n <= 0, an empty string will be returned.
          // for input: ("Hi", 2), will get "HiHi".
          // To return a nil value, just return non-nil error.
          // SQL sample: SELECT RepeatString('Hi', cast(2 as bigint)).
          func RepeatString(str string, n int64) (string, error) {
          	sb := strings.Builder{}
          	for i := int64(0); i < n; i++ {
          		sb.WriteString(str)
          	}
          
          	return sb.String(), nil
          }

          聚合函数(Aggregate Function)

          聚合函数对一个窗口内所有消息的同一字段进行汇总,最后返回一个值,如max, min, sum, count等。

          在Creek中,一个聚合函数体现为一个实现了规定接口的struct。该结构支持初始化,数据累计,到最后产出结果。根据场景不同,还需要支持retract,剔除已经累计的数据,以及两个聚合状态的合并merge。

          具体需要实现的接口如下:

          1. Open(reserved interface{}) 用于初始化,reserved为保留字段
          2. CreateAccumulator() interface{},用于一个窗口状态的初始化,返回类型为任意你定义的状态,例如一个结构体的指针。为了确保状态能够被正确的在snapshot时持久化,应当在文件的init()函数中,向gob注册该结构体。如: gob.Register(&TopWordAccumulator{})
          3. Accumulate(acci interface{}, value interface{}),每当同一窗口中有消息到来,会调用该函数将新消息累计到窗口的状态中去。acci为之前CreateAccumulator返回的状态;value为待聚合的字段。函数的具体实现通常需要对acci和value进行类型的cast。
          4. GetValue(acci interface{}) ([ActualType], error),当一个窗口触发时,调用该函数获取到聚合后的值。acci为之前CreateAccumulator返回的状态;该函数应有两个返回类型,第一个为正常返回的数据类型,如int64, float64等,第二个为error,用来表示返回空值。error为nil表示正常返回,否则意味着返回了空值。
          5. Retract(acci interface{}, value interface{}),从窗口中移除一个已经accumulate过的数据,acci为CreateAccumulator返回的状态, value为待移除数据。通常在OVER窗口中需要,其他窗口如tumble, hop, session窗口无需具体实现,只留一个空实现即可。对于SUM聚合函数,该方法通常从累计值减去value;对于COUNT聚合函数,通常意味着计数器减一。
          6. Merge(acciA interface{}, acciB interface{}),将两个窗口的状态合并,例如当两个session窗口重叠后,需要对两个窗口的状态进行合并。acciA, acciB分别为两个窗口初始化时CreateAccumulator返回的状态,合并后的状态应该存入acciA。其他窗口,如tumble, hop, over等窗口无需真正实现该方法,只留一个空的实现即可。
          7. ResetAccumulator(acci interface{}),重置窗口状态。acci为窗口初始化时CreateAccumulator返回的状态。
          8. Close(),用于状态清理。在作业正常退出时调用。

          示例:

          聚合函数返回一个窗口中,出现最多的单词。由2个struct组成,TopWordAccumulator为窗口的状态;TopWordAggFunction为聚合函数本身。

          TopWordAccumulator:

          每个窗口初始化会创建一个,并且checkpoint时会将其通过gob持久化,recover时通过gob反序列化它,因此需要在源文件的init()函数中,通过gob进行注册。

          func init() {
              gob.Register(&TopWordAccumulator{})
          }

          其内部通过一个map记录每个单词出现的次数,同时提供单词的加入,移除,状态的合并,以及最终结果的获取。其具体实现如下:

          // TopWordAccumulator holds the state of TopWordAggFunction, will return the 
          // most popular word in a given window, by number of appearence.
          // WordCount stores the count of each word.
          type TopWordAccumulator struct {
              WordCount map[string]int64
          }
          
          func (o *TopWordAccumulator) Add(str string, cnt int64) {
              o.WordCount[str] += cnt
          }
          
          func (o *TopWordAccumulator) Remove(str string, cnt int64) {
              o.WordCount[str] -= cnt
              if o.WordCount[str] <= 0 {
                  delete(o.WordCount, str)
              }
          }
          
          func (o *TopWordAccumulator) Merge(accb *TopWordAccumulator) {
              for k, v := range accb.WordCount {
                  o.Add(k, v)
              }
          }
          
          func (o *TopWordAccumulator) Reset() {
              o.WordCount = make(map[string]int64)
          }
          
          // TopWord returns the word with largest count, return 
          // empty string if no element. You can also return a 
          // nil if state is empty.
          func (o *TopWordAccumulator) TopWord() string {
              maxCnt := int64(0)
              var topWordSoFar string
              for k, v := range o.WordCount {
                  if v > maxCnt {
                      maxCnt = v
                      topWordSoFar = k
                  }
              }
              return topWordSoFar
          }

          TopWordAggFunction:

          TopWordAggFunction为SQL中可以引用的函数名,接收一个字符串类型,聚合结果返回也是一个字符串类型。主要作为TopWordAccumulator状态的生命周期管理。

          SQL引用示例:

          SELECT TopWordAggFunction(aString) FROM mysrc GROUP BY …

          其具体实现如下:

          // TopWordAggFunction is the function name can be called from SQL.
          // it accumulate and calcuate the result, with the help
          // of TopWordAccumulator.
          // SQL sample: SELECT TopWordAggFunction('Hello').
          type TopWordAggFunction struct {
            
          }
            
          func (f TopWordAggFunction) Open(reserved interface{}) {
          	// nothing to init.
          }
            
          // CreateAccumulator create a new instance of *TopWordAccumulator, when
          // a new window created.
          func (f TopWordAggFunction) CreateAccumulator() interface{} {
          	return &TopWordAccumulator{WordCount: make(map[string]int64)}
          }
          
          // Accumulate add an element into the a window, parameter types should be interface{}
          // acci is a instance of TopWordAccumulator, create from CreateAccumulator before
          // value is the element to be accumulated, e.g. a string.
          func (f TopWordAggFunction) Accumulate(acci interface{}, value interface{}) {
          	acc := acci.(*TopWordAccumulator)
          	if (value != nil) {
          		val := value.(string)
          		acc.Add(val, 1)
          	}
          }
            
          // GetValue is invoked when a window is going to emit its content.
          // acci is a instance of TopWordAccumulator, create from CreateAccumulator before.
          // To return a nil value, just return a non-nil error.
          func (f TopWordAggFunction) GetValue(acci interface{}) (string, error)  {
          	acc := acci.(*TopWordAccumulator)
          	return acc.TopWord(), nil
          }
          
          // Retract remove a element from the window state. This is required by, e.g. Over window.
          // If your function only needed for Tumble, Hop, Session window, then no need to implement this.
          func (f TopWordAggFunction) Retract(acci interface{}, value interface{}) {
          	acc := acci.(*TopWordAccumulator)
          	if (value != nil) {
          		val := value.(string)
          		acc.Remove(val, 1)
          	}
          }
            
          // Merge is needed for session window, when two windows overlap, then Merge
          // is called to combine the window state.
          // acciA will be result state, so data is merge from acciB to acciA
          func (f TopWordAggFunction) Merge(acciA interface{}, acciB interface{}) {
          	a := acciA.(*TopWordAccumulator)
          	b := acciB.(*TopWordAccumulator)
          	a.Merge(b)
          }
            
            
          func (f TopWordAggFunction) ResetAccumulator(acci interface{}) {
          	acc := acci.(*TopWordAccumulator)
          	acc.Reset()
          }
            
          func (f TopWordAggFunction) Close() {
          	
          }

          表函数(Table Function)

          表函数接收一条消息中的一个或者多个字段,产出0条或者多条消息。

          表函数具体表现为一个实现了Eval和GetType两个接口的struct。

          Eval方法为具体的实现方法,运行时每条输入消息都会调用,返回类型为[]interface{},每一个元素表示一条输出。每一条输出包含一个或者多个值,也用一个[]interface{}表示。因此,接口规定的返回类型为[]interface{},实际返回的数据类型为[][]interface{}。

          GetType方法为编译期辅助系统判断Eval实际类型的函数,运行时不会被调用。该方法返回的类型,为Eval每一条返回中各个字段的具体类型。

          示例:

          Split函数对一个字符串进行split,返回每个分隔后的字符串以及它的长度。输入参数str为待分隔的字符串,sep为分隔符。因此函数签名为:

          Eval(str string, sep string) []interface{}

          由于系统无法从[]interface{}中推断具体返回的每条消息的类型,需要查看GetType方法的返回类型,因为返回的类型为一个字符串,和一个整数型长度,因此GetType的签名为:

          GetType() (string, int64)

          SQL引用参考:

          SELECT deviceid, word, length from mysrc , LATERAL TABLE(Split(deviceid, ',')) as T(word, length)

          函数的具体实现:

          // Split is the table function, this struct need to implement two
          // methods: Eval and GetType. 
          // Eval is the one actually be invoked during runtime, while GetType 
          // is just a help class that provid actual returned types of Eval.
          // SQL sample:  
          // SELECT msg, word, length from mysrc, LATERAL TABLE(Split(msg, ',')) as T(word, length).
          type Split struct {
          
          }
          
          // Eval is the method that a table faunction need to implement, and the
          // the return type MUST be []interface{}, where each element is a row,
          // again it is presented by an other []interface. So the actual returned type
          // should be [][]interface.
          func (o Split) Eval(str string, sep string) []interface{} {
          	parts := strings.Split(str, sep)
          	ret := make([]interface{}, len(parts))
          	for i, p := range parts {
          		elem := make([]interface{}, 2)
          		elem[0] = p
          		elem[1] = int64(len(p))
          		ret[i] = elem
          	}
          
          	return ret
          }
          
          // GetType is dummy method that never be called, it is used to
          // tell the compiler what actual types are returned by Eval method,
          // which is required to return a []interface, where compiler
          // could not tell the concrete type by Eval alone. 
          func (o Split) GetType() (string, int64) {
          	return "", 0
          }
          上一篇
          运行时设置
          下一篇
          API 参考