H Doucet
H Doucet

Reputation: 77

Why is this inner join returning noting, while it should return many lines items

This code executes an innerjoin operation on an influx.db My expectation is that a new table should be returned with entries that are common for booth input tables. But is does not return is nothing.

Can somebody assist to tell me what i doing wrong.

import "join"

// The first query on the influx DB, returning the left stream 
left =
from(bucket: "IoT_Prod")
  |> range(start: -1d)
   |> filter(fn: (r) => r["_field"] == "aanvoer_temp")
  |> filter(fn: (r) => r["CV_status"] == "hwc")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
  |> yield(name: "hwc")

// The second query on the influx DB, returning the right stream
right =
from(bucket: "IoT_Prod")
  |> range(start: -1d)
  |> filter(fn: (r) => r["_field"] == "geleverd gas")
  |> aggregateWindow(every: 1h , fn: mean, createEmpty: false)
  |> yield(name: "gas")

// The inner join operation that should return a DB with common lines on time entry. 
join.inner(
  left : left,
  right : right,
  // Statement to filter on those lines with time is equal on both streams. 
  on : (l,r) => l._time == r._time,
  // The structucture of the data that should be returned. 
  as : (l,r) => ({join_time: r._time, join_value : r._value, join_field : r._field, join_CV_status : l.CV_status}),
  )

The result is the following output ..

enter image description here

I was expecting the influx to return a new table (measurement) with the collored lines. Since they are common in both tables.

Upvotes: 0

Views: 363

Answers (1)

H Doucet
H Doucet

Reputation: 77

I have found the solution. Each imput table needs to be grouped by _time. As a result flux will created many tables, one per time unit. In my case one table per 1h. A join will group then those tables with similar time units.

import "join"
import "date"

lefti = from(bucket: "IoT_Prod")
  |> range(start: date.truncate(t: -3d, unit: 1h))
  |> filter(fn: (r) => r["_field"] == "aanvoer_temp" and r["CV_status"] == "hwc")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
  //|>limit(n:5)
  |> group (columns: ["_time"])
  |> yield(name: "left_hwc")

right = from(bucket: "IoT_Prod")
  |> range(start: date.truncate(t: -3d, unit: 1h))
  |> filter(fn: (r) => r["_field"] == "geleverd gas" and r["loc"] == "BO")
  |> aggregateWindow(every: 1h , fn: spread, createEmpty: false)
  //|>limit(n:5)
  |> group (columns: ["_time"])
  |> yield(name: "right_gas")

 

// Here we add a join.
join.left(
  left : lefti,
  right : right,
  on : (l,r) => (l._time == r._time),
  as : (l,r) => ({r with cv_status : l.CV_status }),
  //method : "full"
  )

Table 1 - HWC Table 2 - Gas Table 3 - Join Result

Upvotes: 0

Related Questions