bigmacsetnotenough
bigmacsetnotenough

Reputation: 112

How to implement volumeBar K-line in stream computing?

I have a streaming table of ticks and I want to segment K-lines based on volume, for example, creating a new bar for every 10,000 shares traded, and calculating the OHLC values.

How can I implement a function similar to volumeBar on streaming data?

it might be handled with the built-in time-series engine, but I've read the official description of the parameters for the time-series engine documentation(:https://docs.dolphindb.com/en/Tutorials/stream_aggregator.html?hl=time-series%2Cstream%2Cengine), but I can't find a use case for it.

Upvotes: 0

Views: 18

Answers (1)

元圆圆
元圆圆

Reputation: 1

Since the time-series aggregation engine can only calculate fixed-length sliding windows, and the state engine currently does not directly support the volumeBar function as a state function for calculation.

For now, we can only implement the logic ourselves using a class, with a reference script as follows:

// Function: Group by stock ID, segment windows based on the cumulative volume, and calculate the average price of each window entry by entry.
// Input data
securityID = `st001`st002`st001`st002`st001`st002
time = 2022.01.01 2022.01.01 2022.01.02 2022.01.02 2022.01.03 2022.01.03
price = 30.15 30.21 30.09 30.13 30.18 30.16
volume = 190 212 198 211 205 199
t = table(securityID, time, price, volume)

class MyVolumeBarFilter{
        totalVol :: LONG 
        interval :: LONG
        def MyVolumeBarFilter(interval_) {
                groupNum = 0
                totalVol = 0
                interval = interval_
        } 
        def append(volume) {
                if (totalVol > interval) {
                        totalVol = 0
                }
                totalVol += volume
                return totalVol>interval
        }
}

class MyVolumeBar{
        groupNum :: LONG 
        totalVol :: LONG 
        sumPrice :: DOUBLE
        count :: LONG
        interval :: LONG
        def MyVolumeBar(interval_) {
                totalVol = 0
                sumPrice = 0.0
                count = 0
                interval = interval_
        } 
        def append(volume, price) {
                if (totalVol > interval) {
                        totalVol = 0
                        sumPrice = 0.0
                        count = 0        
                }
                totalVol += volume
                sumPrice += price
                count += 1
                avgPrice = sumPrice\count
                return avgPrice
        }
}
result = table(1:0, `securityID`time`price`volume`avgPrice, [SYMBOL, DATE, DOUBLE, LONG, DOUBLE])
try { dropStreamEngine(`demo) } catch(ex) {}
rse = createReactiveStateEngine(name="demo", metrics =<[time, price, volume, MyVolumeBar(400).append(volume, price) as `avgPrice]>, dummyTable=t, outputTable=result, keyColumn="securityID", filter=<MyVolumeBarFilter(400).append(volume)>)
rse.append!(t)

Upvotes: 0

Related Questions