Tima
Tima

Reputation: 161

InfluxDB Flux join series

I have following data in influxdb

server,operation=ADD queryMs=7.9810 1620608972904452000
server,operation=GET queryMs=12.2430 1620608972909339200
server,operation=UPDATE queryMs=11.5780 1620608972909655400
server,operation=ADD queryMs=11.2460 1620608972910445700
server,operation=GET queryMs=15.0620 1620608972911305000
etc...

So in my graph i see three series enter image description here

I want to achieve one series of all operations.

I tried to |> group(columns: ["_field"]), and this is what i need, but query is extremely slow!

from(bucket: "initial")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "server")
  |> filter(fn: (r) => r["_field"] == "queryMs")
  |> group(columns: ["_field"])
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

enter image description here Any fast solutions for my problem?

Upvotes: 2

Views: 2155

Answers (2)

dmgeurts
dmgeurts

Reputation: 147

While this is an old thread, using union when a proper filter can be used is not helpful for those who come across the first answer.

from(bucket: "initial")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop
  |> filter(fn: (r) => r["_measurement"] == "server")
  |> filter(fn: (r) => r["_field"] == "queryMs")
  |> filter(fn: (r) => r["operation"] == "GET" or
    r["operation"] == "ADD" or
    r["operation"] == "UPDATE")
  |> drop(columns:["operation"])
  |> sort(columns: ["_time"], desc: false)
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

I can only assume that the query was faster because of the reduced size of the returned dataset due to filtering, not the use of union.

Upvotes: 0

Tima
Tima

Reputation: 161

This works faster

union(tables: [
  from(bucket: "initial")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "server")
    |> filter(fn: (r) => r["_field"] == "queryMs")
    |> filter(fn: (r) => r["operation"] == "GET")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false),
  from(bucket: "initial")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "server")
    |> filter(fn: (r) => r["_field"] == "queryMs")
    |> filter(fn: (r) => r["operation"] == "ADD")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false),
  from(bucket: "initial")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "server")
    |> filter(fn: (r) => r["_field"] == "queryMs")
    |> filter(fn: (r) => r["operation"] == "UPDATE")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false),
  ])
  |> drop(columns:["operation"])
  |> sort(columns: ["_time"], desc: false)
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

Upvotes: 4

Related Questions