Reputation: 1
I am using 1 Min OHLC generated from tick. Then 5 min OHLC generated from 1 Min OHLC using updateTime=60000. Both data stored in StreamTable. When I use 5 Min OHLC with reactive engine (or any other engine / table), I get all unwanted data into engine / table. I want to keep the last data of each unique timestamp when I use subscribetable. Refer the photo for more information. Please help me solve the problem.1
Edit 1: Here is the code. python code for insert data into stream table t
s.run("tableInsert(t, (convertTZ(timestamp({3}),'UTC','Asia/Kolkata')), `{0}, {1}, {2}, convertTZ(now(),'Asia/Kolkata','Asia/Kolkata'));".format(str( msg['s']),float( msg['p']),float( msg['q']),int( msg['T'])))
1 Min OHLC
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm1, cacheSize=1200000, retentionMinutes=15)
OHLC_sm1 = createTimeSeriesEngine(name="OHLC_sm1", windowSize=60000, step=60000, metrics=<[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(vol) as volume, last(upd_tick) as last_tick, now() as upd_1m]>, dummyTable=tmp, outputTable=sm1, timeColumn=`ts, useSystemTime=true, keyColumn=`symbol, updateTime=60000, useWindowStartTime=false);
subscribeTable(tableName="t", actionName="OHLC_sm1", offset=0, handler=append!{OHLC_sm1}, msgAsTable=true, hash=1)
5 Min OHLC
enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm5, cacheSize=1200000, retentionMinutes=30)
ohlc=<[first(open) as open, max(high) as high, min(low) as low, last(close) as close, sum(volume) as volume, last(upd_1m) as last_tick, now() as upd_5m]>
OHLC_sm5 = createTimeSeriesEngine(name="OHLC_sm5", windowSize=300000, step=300000, metrics=ohlc, dummyTable=tmp, outputTable=sm5, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol, updateTime=60000, useWindowStartTime=true);
subscribeTable(tableName="sm1", actionName="OHLC_sm5", offset=0, handler=append!{OHLC_sm5}, msgAsTable=true, hash=1)
reactive state engine code:
enableTableShareAndPersistence(table=streamTable(25000:0,rsesm5_colNames,rsesm5_colTypes), tableName=`rsesm5, cacheSize=1200000, retentionMinutes=1440)
rse5m = createReactiveStateEngine(name="rse5m", metrics =<[ts,close,mavg(close,4),now()]>, dummyTable=sm5, outputTable=rsesm5, keyColumn="symbol")
subscribeTable(tableName="sm5", actionName="rse5m", offset=0, handler=append!{rse5m}, msgAsTable=true, hash=1)
current output:2
formula :3
Correct Result:4
Upvotes: -1
Views: 9