DStream::ruleEngine

语法

DStream::ruleEngine(ruleSets, outputColumns, [policy], [ruleSetColumn], [callback])

详情

创建流计算规则引擎。参考:createRuleEngine

返回值:一个 DStream 对象。

参数

ruleSets 是一个字典,表示规则集。字典的 key 是 STRING 或 INT 类型,value 是一个包含元代码的元组。如果 key 为NULL,表示此条为默认规则。一个规则集必须包含默认规则。

outputColumns 一个 STRING 类型向量,表示输入表中需要保留到输出表的列。

policy 是一个字符串标量,表示规则检查策略。可取以下值:
  • "shortcut" 是默认值,代表短路逻辑。当检查到任一规则不符合(该规则的计算结果为 false)时,返回对应的 index ,index 从 0 开始;否则返回 NULL。

  • "all" 检查全部规则,返回一个 BOOL 类型的数组向量,其元素的布尔值是按照规则集 key 对应的规则执行的结果。

ruleSetColumn 是一个 STRING 类型标量,为输入表的某一列名,如果没有定义或者输入数据中的该列数据没有命中任何一个规则集,则使用默认规则。

callback 是一个函数,其参数为一个表,是引擎输出的一行。若设置此参数,引擎每处理一行,在将引擎处理结果输出到输出表的同时,会将该结果作为入参调用此函数。此参数未设置时,仅将引擎处理结果输出到输出表。

例子

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// 如已存在流图,则先销毁该流图
// dropStreamGraph('engine')
g = createStreamGraph('engine')

// 设置规则集
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)

// 创建分布式表,用于回调函数写入数据
if(existsDatabase("dfs://temp")){
    dropDatabase("dfs://temp")
}
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)

// 创建回调函数,根据检测结果,将数据写入分布式表
def writeBack(result){
    if(result.rule[0]==false){
        temp = select sym,value,price from result
        loadTable("dfs://temp",`pt).append!(temp)
    }
}

g.source("trades", 1000:0, `sym`value`price`quantity, [INT, DOUBLE, DOUBLE, DOUBLE])
.ruleEngine(ruleSets=ruleSets, outputColumns=["sym","value","price"], policy="all", ruleSetColumn="sym", callback=writeBack)
.sink("output")
g.submit()
go

tmp=table(1 1 as sym, 0 2 as value, 2 2 as price, 3 3 as quantity)
appendOrcaStreamTable("trades", tmp)

select * from orca_table.output
sym value price rule
1 0 2 [false]
1 2 2 [true]