Reputation: 11
I am using influxdb 2.7.6, and created a measurement called cgm_glucose_history. I added a tag called device_sn, a field called glucose, and a field called device_time to record the exact time when blood glucose was generated. In this measurement, each device_sn generates a glucose value every minute.
I use the java client to save data
List<Point> pointList = new ArrayList<>();
for (CgmGlucose cgmGlucose : list) {
// Time alignment to minutes, 00:00,00:01,00:2,00:3...
long time = cgmGlucose.getDeviceTime() - cgmGlucose.getDeviceTime() % 1000L;
Point point = Point.measurement(MEASUREMENT_CGM_GLUCOSE_HISTORY)
.time(time, WritePrecision.MS)
.addTag("patient_code", cgmGlucose.getPatientCode())
.addTag("device_sn", cgmGlucose.getDeviceSn())
.addField("glucose", cgmGlucose.getGlucose())
.addField("device_time", cgmGlucose.getDeviceTime());
pointList.add(point);
if (pointList.size() >= 1000) {
writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
pointList.clear();
}
}
if (!pointList.isEmpty()) {
writeApi.writePoints(InfluxConfig.getBucket(), InfluxConfig.getOrg(), pointList);
pointList.clear();
}
writeApi.close();
I now have a requirement to query the time when a specific device_sn first had a sustained hyperglycemia event (if any). Sustained hyperglycemia is defined as a glucose value greater than 13.9 for more than two hours. I used the window and reduce methods, and the code is as follows:
import "interpolate"
from(bucket: "cdm_dm")
|> range(start: 1718679994)
|> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history")
|> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
|> filter(fn: (r) => r["_field"] == "glucose" or r["_field"] == "device_time")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
|> interpolate.linear(every: 1m)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> window(every: 1m, period: 122m)
// The core logic is to count the points where the glucose value is greater than 13.9 within two hours.
|> reduce(fn:(r, accumulator) => ({
count: if r.glucose > 13.9 then accumulator.count+1 else 0,
event_start_time:if r.glucose > 13.9 then r.device_time else 0.0,
glucose:if (r.glucose > 13.9 and accumulator.count==121) then r.glucose else if (r.glucose > 13.9 and accumulator.count < 121)
then accumulator.glucose else 0.0
}),identity:{count:0,event_start_time:0.0,glucose:0.0})
|> duplicate(column: "_start", as: "_time")
|> window(every: inf)
|> filter(fn: (r) => r["count"] == 122)
|> limit(n:1)
However, in the actual process, I found that the execution time is particularly long. With very little test data, it takes more than 30 seconds to execute to get the result. Is there something wrong with my code logic? Is there a better way for influx to achieve this requirement?
I have no further ideas on how to optimize this code or data structure
Upvotes: 0
Views: 33
Reputation: 11
I found a new function that can meet this need,stateDuration
from(bucket: "cdm_dm")
|> range(start: 1718679994)
|> filter(fn: (r) => r["_measurement"] == "cgm_glucose_history")
|> filter(fn: (r) => r["device_sn"] == "TT22222AN2")
|> filter(fn: (r) => r["_field"] == "glucose")
|> stateDuration(fn: (r) => r._value > 13.9)
|> filter(fn:(r) => r["stateDuration"] > 120*60)
|> limit(n:1)
Upvotes: 1