DStream::reactiveStateEngine
语法
DStream::reactiveStateEngine(metrics, [keyColumn], [filter], [keepOrder],
[keyPurgeFilter], [keyPurgeFreqInSecond=0], [keyCapacity=1024],
[parallelism=1])
详情
创建流计算响应式状态引擎。参考:createReactiveStateEngine。
返回值:一个 DStream 对象。
参数
- 需在定义前添加声明 "@state"。状态函数只能包含赋值语句和 return 语句。
自 2.00.9 版本起,支持使用 if-else 条件语句,且条件只能是标量。
自2.00.11 版本起,支持使用 for 循环(包含 break, continue 语句),请注意不支持嵌套 for 循环,且循环次数须小于 100 次。
- 状态引擎中可以使用无状态函数或者状态函数。但不允许在无状态函数中嵌套使用状态函数。
- 若赋值语句的右值是一个多返回值的函数(内置函数或自定义函数),则需要将多个返回值同时赋予多个变量。例如:两个返回值的函数 linearTimeTrend
应用于自定义状态函数中,正确写法为:
@state def forcast2(S, N){ linearregIntercept, linearregSlope = linearTimeTrend(S, N) return (N - 1) * linearregSlope + linearregIntercept }
keyColumn 可选参数,字符串标量或向量表示分组列名。若指定该参数,计算将在各分组进行。
filter 可选参数,以元代码的形式表示过滤条件。过滤条件只能是一个表达式,并且只能包含 dummyTable 中的列。设置多个条件时,用逻辑运算符(and, or)连接。引擎会先计算指标,然后根据 filter 指定的过滤条件,输出满足条件的输入数据对应的计算结果。
keepOrder 可选参数,表示输出表数据是否按照输入时的顺序排序。设置 keepOrder = true,表示输出表按照输入时的顺序排序。当 keyColumn 包含有时间列时,keepOrder 默认值为 true,否则默认值为 false。
keyPurgeFilter 可选参数,是一个由布尔表达式组成的元代码,表示清理条件。各表达式只能引用 outputTable 中的字段。必须指定 keyColumn 才能使用该参数。
keyPurgeFreqInSecond 正整数,表示触发数据清理需要满足的时间间隔(以秒为单位)。必须指定 keyColumn 才能使用该参数。
keyCapacity 正整数,可选参数,表示建表时系统为该表预分配的 key 分组数量,用于调整状态表中 key 的函数。通过该参数的合理设置,能够降低在 key 分组较多时可能出现的延迟。
parallelism 不超过63的正整数,可选参数,表示并行计算的工作线程数,默认值为 1。在计算量较大时,合理地调整该参数能够有效利用计算资源,降低计算耗时。
例子
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// 如已存在流图,则先销毁该流图
// dropStreamGraph('engine')
g = createStreamGraph('engine')
g.source("trades", 1000:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
.reactiveStateEngine(metrics=<mavg(price, 3)>, keyColumn=["date","sym"], filter=<date between 2012.01.01 : 2012.01.03>, keepOrder=true)
.sink("output")
g.submit()
go
n=100
tmp = table(rand(2012.01.01..2012.01.10, n) as date, rand(09:00:00.000..15:59:59.999, n) as time, rand("A"+string(1..10), n) as sym, rand(['B', 'S'], n) as market, rand(100.0, n) as price, rand(1000..2000, n) as qty)
appendOrcaStreamTable("trades", tmp)