DStream::cryptoOrderBookEngine
语法
DStream::cryptoOrderBookEngine(inputColMap, depth, [updateRule='direct'],
[errorHandler=NULL], [cachingInterval=5000], [timeout=-1],
[cachedDepth])
详情
创建流计算数字货币实时订单簿引擎,支持根据数字货币全量深度快照和增量深度信息,实时更新订单簿。参考:createCryptoOrderBookEngine。
返回值:一个 DStream 对象。
参数
inputColMap 字典,将 dummyTable 中列字段的名称映射为引擎计算所需要的列。
-
正整数:所有数字货币的订单簿深度都为该设置值。
-
字典:键是字符串标量或向量,表示数字货币代码;值是整型,表示订单簿深度。对于指定的数字货币,将按照设定的深度输出订单簿。未被指定的数字货币将不输出订单簿结果。
- "direct":默认值,表示直接更新,系统将不进行数据丢失判断,仅根据 isIncremental 字段的值绝对是更新(isIncremental 为 true)还是覆盖(isIncremental 为 false)订单簿结果。
- "general":表示通用的更新规则。要求按顺序插入增量数据,并确保数据中 updateId 单调递增。
- "Binance-spot":表示适用于币安交易所现货的更新规则。
- "Binance-futures":表示适用于币安交易所期货的更新规则。
- 第一个是字符串标量,表示数字货币代码。
- 第二个是整型标量,表示错误码,包括以下情况:
- 1:接收到历史数据。
- 2:接收到应在未来时间点到达的乱序数据。
- 3:超时错误,指定时间间隔内没有新的订单簿合成。
- 4:买卖价格交叉错误。判断交叉的条件为最高买价大于等于最低卖价。
cachingInterval 可选参数,整型标量,表示缓存增量深度数据的时间间隔。默认值为 5000,单位为毫秒。对于每个数字货币,当缓存中的数据时间与当前最新一条数据时间的差值小于等于 cachingInterval 时,该数据才会被保留在缓存中。
timeout 可选参数,整型标量,表示超时时间。单位为毫秒,默认值为 -1,表示不设置超时时间。在 timeout 指定的时间内未合成新的订单簿时,将触发超时错误,此时会通过 errorHandler 进行错误处理。
cachedDepth 可选参数,正整数或字典,用于指定缓存中价格和数量(包括 askQty, askPrice, bidQty, bidPrice)的深度。
-
正整数:缓存中所有数字货币的价格和数量将按照该值的深度进行缓存。
-
字典:键是字符串标量或向量,表示数字货币代码;值是整型,表示对应的深度。对于指定的数字货币,将按照设定的深度缓存价格和数量。未被指定的数字货币将保留实际收到的所有价格和数量信息。
例子
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// Define input table schema
colNames = `isIncremental`exchange`eventTime`transactionTime`symbol`firstUpdateId`lastUpdateId`prevLastUpdateId`bidPrice`bidQty`askPrice`askQty
colTypes = [BOOL, SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DECIMAL128(18)[], DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]
inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "lastUpdateId", "firstUpdateId", "prevLastUpdateId"]
inputSource = ["symbol", "eventTime", 'isIncremental', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'lastUpdateId', 'firstUpdateId', 'prevLastUpdateId']
// Map input columns
inputColMap = dict(inputTarget, inputSource)
// Set depth
depth = dict(["BTCUSDT"], [1000])
cptGraph = createStreamGraph("cptEngine")
cptGraph.source("cptInput", 1000:0, colNames,colTypes)
.cryptoOrderBookEngine(inputColMap, depth)
.sink("output")
cptGraph.submit()