DStream::fork

语法

DStream::fork(count)

参数

count 整数,指定分叉数量。

详情

生成多个流数据下游 DStream 分支(以广播方式复制数据),便于在不同分支中执行各自的处理逻辑。

返回值:指定数量的 DStream 实例列表。

例子

将 K 线数据复制两份,分别计算 1 分钟和 5 分钟级因子:

use catalog test

g = createStreamGraph("indicators")
sourceStreams = g.source("trade", 1024:0, `symbol`datetime`price`volume, [SYMBOL, TIMESTAMP,DOUBLE, INT])
    .fork(2)
stream_1min = sourceStreams[0]
    .timeSeriesEngine(60*1000, 60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
    .reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
    .sink("output_1min")
stream_5min = sourceStreams[1]
    .timeSeriesEngine(5*60*1000, 5*60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
    .reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
    .sink("output_5min")
g.submit()