StreamGraph::sourceByName
语法
StreamGraph::sourceByName(name)
详情
按表名引用流图中已提交(即已经通过 submit 注册)的共享流表对象。
返回值:一个 DStream 对象。
参数
name 表示流表的名称。字符串标量,可以传入完整的流表全限定名(如 trading.orca_graph.trades);也可以仅提供流表名(如 trades),系统会根据当前的 catalog 设置自动补全为对应的全限定名。
例子
首先创建并提交流图 “aggregation”。
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
aggGraph = createStreamGraph("aggregation")
aggGraph.source("trade", 1000:0, `time`sym`price, [TIMESTAMP, SYMBOL, FLOAT])
.timeSeriesEngine(windowSize=60, step=60, metrics=[<sum(price) as price>], timeColumn="time", keyColumn="sym")
.sink("aggregated")
aggGraph.submit()
在另一个流图中,通过 sourceByName
引用已提交流图 “aggregation” 中的输出流表
"aggregated"。
def EMA(S, N) {
return ::ewmMean(S, span = N, adjust = false)
}
indicatorGraph = createStreamGraph("indicators")
indicatorGraph.sourceByName("aggregated")
.reactiveStateEngine(metrics=[<EMA(price, 20)>], keyColumn=`sym)
.sink("indicators")
indicatorGraph.submit()