DStream::timeBucketEngine
语法
DStream::timeBucketEngine(timeCutPoints, metrics, timeColumn, [keyColumn],
[useWindowStartTime], [closed='left'], [fill='none'], [keyPurgeFreqInSecond=-1],
[parallelism=1])
详情
创建流计算自定义窗口长度(长度相同或不同)的时间序列聚合引擎,支持基于该引擎定义后续处理逻辑。
返回值:一个 DStream 对象。
参数
-
timeCutPoints 指定的时间精度必须不高于 timeColumn 列的精度。
-
timeCutPoints 指定的时间精度决定关闭窗口右边界的时间精度。如果 timeCutPoints 是 MINUTE 类型,则窗口右边界的时间精度是分;如果 timeCutPoints 是 SECOND 类型,则窗口右边界的时间精度是秒。
-
计算指标可以是一个或多个系统内置或用户自定义的聚合函数(使用 defg 关键字定义),如 <[sum(volume), avg(price)]>;可以对聚合结果使用表达式,如 <[avg(price1)-avg(price2)]>;也可对列与列的计算结果进行聚合计算,如 <[std(price1-price2)]>。
-
metrics 内支持调用具有多个返回值的函数,例如 <func(price) as `col1`col2>(可不指定列名)。
-
若 windowSize 为向量, windowSize 每个值可对应 metrics 中多个计算指标。例如,windowSize 为[10,20]时,metrics可为 (<[min(volume), max(volume)]>, <sum(volume)>)。 metrics 也可以嵌套输入元组向量。例如:[[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]]
注:- metrics 中使用的列名大小写不敏感,不需要与输入表的列名大小写保持一致。
- metrics 中不可使用嵌套聚合函数。
timeColumn 字符串标量或长度为2的向量,用于输入表中时间列的名称。
keyColumn 可选参数,字符串标量或向量,表示分组列名。若设置,则分组进行聚合计算,例如以每支股票为一组进行聚合计算。
useWindowStartTime 可选参数。布尔值,表示输出表中的时间是否为数据窗口起始时间。默认值为 false,表示输出表中的时间为窗口结束时间。
-
closed = 'left': 窗口左闭右开。
-
closed = 'right': 窗口左开右闭。
- 'none': 不输出结果。
- 'null': 输出结果为 NULL。
- 'ffill': 输出上一个有数据的窗口的结果。
- '具体数值':该值的数据类型需要和对应的 metrics 计算结果的类型保持一致。
keyPurgeFreqInSec 正整数,表示清理窗口数据为空的分组的时间间隔,单位为秒。指定该参数后,若当前数据注入时间与上一次清理时间的间隔大于等于 keyPurgeFreqInSec,则触发对当前窗口数据为空的分组信息的清理。若指定该参数,则必须指定 keyColumn,且不能指定 fill。
parallelism 为不超过 63 的正整数,可选参数,表示并行计算的工作线程数,默认值为 1。在计算量较大时,合理地调整该参数能够有效利用计算资源,降低计算耗时。建议小于机器核数,推荐值为 4 到 8 。
例子
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// 如已存在流图,则先销毁该流图
// dropStreamGraph('engine')
g = createStreamGraph('engine')
g.source("trades", 1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT])
.timeSeriesEngine(windowSize=60000, step=60000, metrics=<[first(price), max(price), min(price), last(price), sum(volume)]>, timeColumn=`time, useSystemTime=false, keyColumn=`sym, useWindowStartTime=false)
.timeBucketEngine(timeCutPoints=[10:00m, 10:05m, 10:10m, 10:15m], metrics=<[first(first_price), max(max_price), min(min_price), last(last_price), sum(sum_volume)]>, timeColumn=`time, keyColumn=`sym)
.sink("output")
g.submit()
go
times = [2024.10.08T10:01:01.785, 2024.10.08T10:01:02.125, 2024.10.08T10:01:12.457, 2024.10.08T10:03:10.789, 2024.10.08T10:03:12.005, 2024.10.08T10:08:02.236, 2024.10.08T10:08:04.412, 2024.10.08T10:08:05.152, 2024.10.08T10:08:30.021, 2024.10.08T10:10:20.123, 2024.10.08T10:11:02.236, 2024.10.08T10:13:04.412, 2024.10.08T10:15:12.005]
syms = [`A, `B, `A, `A, `B, `A, `B, `B, `A, `A, `A, `B, `B]
prices = [10.83, 21.73, 10.79, 11.81, 22.96, 11.25, 23.03, 23.18, 11.04, 11.85, 11.06, 23.15, 22.06]
volumes = [2110, 1600, 2850, 2250, 1980, 2400, 2130, 1900, 2300, 2200, 2200, 1880, 2100]
tmp = table(times as time, syms as sym, prices as price, volumes as volume)
appendOrcaStreamTable("trades", tmp)
select * from orca_table.output
time | sym | first_first_price | max_max_price | min_min_price | last_last_price | sum_sum_volume |
---|---|---|---|---|---|---|
2024.10.08 10:05:00.000 | A | 10.83 | 11.81 | 10.79 | 11.81 | 7,210 |
2024.10.08 10:05:00.000 | B | 21.73 | 22.96 | 21.73 | 22.96 | 3,580 |
2024.10.08 10:10:00.000 | A | 11.25 | 11.25 | 11.04 | 11.04 | 4,700 |
2024.10.08 10:10:00.000 | B | 23.03 | 23.18 | 23.03 | 23.18 | 4,030 |
2024.10.08 10:15:00.000 | B | 23.15 | 23.15 | 23.15 | 23.15 | 1,880 |