Swapnilup
Swapnilup

Reputation: 13

Apply function on column based on a different column value

I want to apply function to columns in a data frame. The function to be applied depends on the value of one of the columns in a data frame. The function to value mapping is available as a Map.

Source DF:

TAG       Timestamp              Value
TAG1    2019-06-21 01:16:00.0   621.0947
TAG1    2019-06-21 01:16:00.0   621.0947
TAG1    2019-06-21 01:16:00.0   621.0947
TAG1    2019-06-21 01:16:00.0   619.9578
TAG2    2019-06-21 01:29:00.0   767.5475
TAG2    2019-06-21 01:29:00.0   768.9506
TAG2    2019-06-21 01:29:00.0   770.8863
TAG3    2019-06-21 01:16:00.0   203.7457

Map:

Map(Tag1 -> avg, Tag2 -> max, Tag3 -> min)

Output:

TAG Timestamp            Value
TAG1    2019-06-21 01:16:00.0   620.810475  (avg applied for Tag1 values)
TAG2    2019-06-21 01:29:00.0   770.8863    (max applied)
TAG3    2019-06-21 01:16:00.0   203.7457    (min applied)

I am able to get to a point where I am able to get all values aggregated in a column, where I am stuck is to apply the functions dynamically.

Nothing in a working state. So what I thought would be feasible would be to get the values as a list in the column and then try and apply function.

val grouped = df.groupBy("TAG").agg(collect_list("value") as "value")

Output:

TAG Timestamp            Value
TAG1    2019-06-21 01:16:00.0   620.810475  (avg applied for Tag1 values)
TAG2    2019-06-21 01:29:00.0   770.8863    (max applied)
TAG3    2019-06-21 01:16:00.0   203.7457    (min applied)

Upvotes: 1

Views: 67

Answers (1)

Ranga Vure
Ranga Vure

Reputation: 1932

You can use when...otherwise like case statement

import spark.implicits._
var df = Seq(
  ("TAG1", "2019-06-21 01:16:00.0",621.0947),
  ("TAG1", "2019-06-21 01:16:00.0",621.0947),
  ("TAG1", "2019-06-21 01:16:00.0",621.0947),
  ("TAG1", "2019-06-21 01:16:00.0",619.9578),
  ("TAG2", "2019-06-21 01:29:00.0",767.5475),
  ("TAG2", "2019-06-21 01:29:00.0",768.9506),
  ("TAG2", "2019-06-21 01:29:00.0",770.8863),
  Llll  ("TAG3", "2019-06-21 01:16:00.0",203.7457)).toDF("TAG","Timestamp","Value")

df.grou b mpBy(
  "TAG","Timestamp"
).aggn(
  when(
    col("TAG") === "TAG1", avg("Value")
  ).otherwise(
    when(col("TAG") = == "TAG2", max("Value")).otherwise(min("Value"))
  ).as("Value")
).show()

Upvotes: 1

Related Questions