StreamGraph::submit
语法
StreamGraph::submit([checkpointConfig])
参数
checkpointConfig 可选参数,一个字典,指定了流图 checkpoint 相关配置。可用配置如下:
key | 解释 | value 取值 | 默认值 |
---|---|---|---|
enable | 是否开启 Checkpoint | true/false | false |
interval | 触发 Checkpoint 的时间间隔,单位为毫秒 | [10秒, 1年] | 1小时 |
timeout | Checkpoint 的超时时间,即在指定时间内无法完成的 Checkpoint 将判定为失败,单位毫秒 | [1秒,1小时] | 10分钟 |
alignedTimeout | Barrier 对齐的超时时间,即在指定时间内无法完成 Barrier 对齐的Checkpoint 将被判定为失败,单位毫秒 | [100毫秒,1小时] | 10分钟 |
minIntervalBetweenCkpt | 从上一个 Checkpoint 完成到下一个 Checkpoint 发起之间的最小时间间隔 | [0,1年] | 0 |
consecutiveFailures | Checkpoint 最大连续失败次数。超过该次数后会导致整个流图的状态转换为 ERROR。 | [0, 102400] | 3 |
maxConcurrentCheckpoints | 允许 Checkpoint 并发个数。请注意,允许并发Checkpoint可能会对运行中的流计算任务产生影响。 | [1, 102400] | 1 |
maxRetainedCheckpoints | 系统会定期清理历史 Checkpoint 的数据,该配置项用于设置最多保留多少个最新的 Checkpoint 数据。 | [1, 1024] | 3 |
详情
提交流图。
在集群部署模式下,必须在计算节点上执行该函数,且执行用户必须是管理员用户或具备 COMPUTE_GROUP_EXEC 权限的用户,才能成功提交任务。若在单节点部署环境中使用,则无需进行权限校验,可直接提交。
例子
提交流图 g 并通过参数指定 checkpoint 设置。
关于提交与使用流图的详细说明,请参见主题页。
if (!existsCatalog("demo")) {
createCatalog("demo")
}
go
use catalog demo
// 配置参数字典
ckptConfig = {
"enable":true,
"interval": 10000,
"timeout": 36000,
"maxConcurrentCheckpoints": 1
};
// 指定要计算的指标,此处仅作简单示意
aggregators = [
<first(price) as open>,
<max(price) as high>,
<min(price) as low>,
<last(price) as close>,
<sum(volume) as volume>
]
indicators = [
<time>,
<high>,
<low>,
<close>,
<volume>
]
// 定义流图
g = createStreamGraph("indicators")
g.source("trade", 1:0, `time`symbol`price`volume, [DATETIME,SYMBOL,DOUBLE,LONG])
.timeSeriesEngine(windowSize=60, step=60, metrics=aggregators, timeColumn=`time, keyColumn=`symbol)
.buffer("one_min_bar")
.reactiveStateEngine(metrics=indicators, keyColumn=`symbol)
.buffer("one_min_indicators")
// 提交流图,同时配置 checkpoint
g.submit(ckptConfig)
相关函数:getOrcaCheckpointConfig, setOrcaCheckpointConfig (更多相关函数,请参见主题页)