Landon
Landon

Reputation: 1

Issue with Filtering Unwanted Data When Subscribing to 5-Minute OHLC in Reactive Engine

I am using 1 Min OHLC generated from tick. Then 5 min OHLC generated from 1 Min OHLC using updateTime=60000. Both data stored in StreamTable. When I use 5 Min OHLC with reactive engine (or any other engine / table), I get all unwanted data into engine / table. I want to keep the last data of each unique timestamp when I use subscribetable. Refer the photo for more information. Please help me solve the problem.1

Edit 1: Here is the code. python code for insert data into stream table t

s.run("tableInsert(t, (convertTZ(timestamp({3}),'UTC','Asia/Kolkata')), `{0}, {1}, {2}, convertTZ(now(),'Asia/Kolkata','Asia/Kolkata'));".format(str( msg['s']),float( msg['p']),float( msg['q']),int( msg['T'])))

1 Min OHLC

enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm1, cacheSize=1200000, retentionMinutes=15)
OHLC_sm1 = createTimeSeriesEngine(name="OHLC_sm1", windowSize=60000, step=60000, metrics=<[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(vol) as volume, last(upd_tick) as last_tick, now() as upd_1m]>, dummyTable=tmp, outputTable=sm1, timeColumn=`ts, useSystemTime=true, keyColumn=`symbol,  updateTime=60000, useWindowStartTime=false);
subscribeTable(tableName="t", actionName="OHLC_sm1", offset=0, handler=append!{OHLC_sm1}, msgAsTable=true, hash=1)

5 Min OHLC

enableTableShareAndPersistence(table=streamTable(25000:0,ohlc_colNames,ohlc_colTypes), tableName=`sm5, cacheSize=1200000, retentionMinutes=30)
ohlc=<[first(open) as open, max(high) as high, min(low) as low, last(close) as close, sum(volume) as volume, last(upd_1m) as last_tick, now() as upd_5m]>
OHLC_sm5 = createTimeSeriesEngine(name="OHLC_sm5", windowSize=300000, step=300000, metrics=ohlc, dummyTable=tmp, outputTable=sm5, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol, updateTime=60000, useWindowStartTime=true);
subscribeTable(tableName="sm1", actionName="OHLC_sm5", offset=0, handler=append!{OHLC_sm5}, msgAsTable=true, hash=1)

reactive state engine code:

enableTableShareAndPersistence(table=streamTable(25000:0,rsesm5_colNames,rsesm5_colTypes), tableName=`rsesm5, cacheSize=1200000, retentionMinutes=1440)
rse5m = createReactiveStateEngine(name="rse5m", metrics =<[ts,close,mavg(close,4),now()]>, dummyTable=sm5, outputTable=rsesm5, keyColumn="symbol")
subscribeTable(tableName="sm5", actionName="rse5m", offset=0, handler=append!{rse5m}, msgAsTable=true, hash=1)

current output:2

formula :3

Correct Result:4

Upvotes: -1

Views: 9

Answers (0)

Related Questions