Reputation: 112
I have a streaming table of ticks and I want to segment K-lines based on volume, for example, creating a new bar for every 10,000 shares traded, and calculating the OHLC values.
How can I implement a function similar to volumeBar on streaming data?
it might be handled with the built-in time-series engine, but I've read the official description of the parameters for the time-series engine documentation(:https://docs.dolphindb.com/en/Tutorials/stream_aggregator.html?hl=time-series%2Cstream%2Cengine), but I can't find a use case for it.
Upvotes: 0
Views: 18
Reputation: 1
Since the time-series aggregation engine can only calculate fixed-length sliding windows, and the state engine currently does not directly support the volumeBar function as a state function for calculation.
For now, we can only implement the logic ourselves using a class, with a reference script as follows:
// Function: Group by stock ID, segment windows based on the cumulative volume, and calculate the average price of each window entry by entry.
// Input data
securityID = `st001`st002`st001`st002`st001`st002
time = 2022.01.01 2022.01.01 2022.01.02 2022.01.02 2022.01.03 2022.01.03
price = 30.15 30.21 30.09 30.13 30.18 30.16
volume = 190 212 198 211 205 199
t = table(securityID, time, price, volume)
class MyVolumeBarFilter{
totalVol :: LONG
interval :: LONG
def MyVolumeBarFilter(interval_) {
groupNum = 0
totalVol = 0
interval = interval_
}
def append(volume) {
if (totalVol > interval) {
totalVol = 0
}
totalVol += volume
return totalVol>interval
}
}
class MyVolumeBar{
groupNum :: LONG
totalVol :: LONG
sumPrice :: DOUBLE
count :: LONG
interval :: LONG
def MyVolumeBar(interval_) {
totalVol = 0
sumPrice = 0.0
count = 0
interval = interval_
}
def append(volume, price) {
if (totalVol > interval) {
totalVol = 0
sumPrice = 0.0
count = 0
}
totalVol += volume
sumPrice += price
count += 1
avgPrice = sumPrice\count
return avgPrice
}
}
result = table(1:0, `securityID`time`price`volume`avgPrice, [SYMBOL, DATE, DOUBLE, LONG, DOUBLE])
try { dropStreamEngine(`demo) } catch(ex) {}
rse = createReactiveStateEngine(name="demo", metrics =<[time, price, volume, MyVolumeBar(400).append(volume, price) as `avgPrice]>, dummyTable=t, outputTable=result, keyColumn="securityID", filter=<MyVolumeBarFilter(400).append(volume)>)
rse.append!(t)
Upvotes: 0