user3407267
user3407267

Reputation: 1624

How to do window function inside a when?

I have a dataframe where I am trying to do window function on a array column.

The logic is as follows: Group by (or window partition) the id and filtered columns. Calculate the max score of the rows where the types column is null, otherwise take the score of that row. When score is not equal to the max score of the group add "NA" to the column type.

val data = spark.createDataFrame(Seq(
  (1, "shirt for women", Seq("shirt", "women"), 19.1, "ST"),
  (1, "shirt for women", Seq("shirt", "women"), 10.1, null),
  (1, "shirt for women", Seq("shirt", "women"), 12.1, null),
  (0, "shirt group women", Seq("group", "women"), 15.1, null),
  (0, "shirt group women", Seq("group", "women"), 12.1, null),
  (3, "shirt nmn women", Seq("shirt", "women"), 16.1, "ST"),
  (3, "shirt were women", Seq("shirt", "women"), 13.1, "ST")
)).toDF("id", "raw", "filtered", "score", "types")

  +---+-----------------+--------------+-----+-----+
|id |raw              |filtered      |score|types|
+---+-----------------+--------------+-----+-----+
|1  |shirt for women  |[shirt, women]|19.1 |ST   |
|1  |shirt for women  |[shirt, women]|10.1 |null |
|1  |shirt for women  |[shirt, women]|12.1 |null |
|0  |shirt group women|[group, women]|15.1 |null |
|0  |shirt group women|[group, women]|12.1 |null |
|3  |shirt nmn women  |[shirt, women]|16.1 |ST   |
|3  |shirt were women |[shirt, women]|13.1 |ST   |
+---+-----------------+--------------+-----+-----+

Expected output:

   +---+------------------+--------------+-----+----+
    |id |raw              |filtered      |score|types|
    +---+-----------------+--------------+-----+----+
    |1  |shirt for women  |[shirt, women]|19.1 |ST  |
    |1  |shirt for women  |[shirt, women]|10.1 |NA  |
    |1  |shirt for women  |[shirt, women]|12.1 |null|
    |0  |shirt group women[women, group] |15.1 |null|
    |0  |shirt group women|[women, group]|12.1 |NA  |
    |3  |shirt nmn women  |[shirt, women]|16.1 |ST  |
    |3  |shirt were women |[shirt, women]|13.1 |ST  |
    +---+-----------------+--------------+-----+----+

I tried:

data.withColumn("max_score",
      when(col("types").isNull,
        max("score")
          .over(Window.partitionBy("id", "filtered")))
        .otherwise($"score"))
      .withColumn("type_temp",
        when(col("score") =!= col("max_score"),
          addReasonsUDF(col("type"),
            lit("NA")))
          .otherwise(col("type")))
      .drop("types", "max_score")
      .withColumnRenamed("type_temp", "types")

But it is not working. This gives me:

+---+-----------------+--------------+-----+---------+-----+
|id |raw              |filtered      |score|max_score|types|
+---+-----------------+--------------+-----+---------+-----+
|1  |shirt for women  |[shirt, women]|19.1 |19.1     |ST   |
|1  |shirt women      |[shirt, women]|10.1 |19.1     |NA   |
|1  |shirt of women   |[shirt, women]|12.1 |19.1     |NA   |
|0  |shirt group women|[group, women]|15.1 |15.1     |null |
|0  |shirt will women |[group, women]|12.1 |15.1     |NA   |
|3  |shirt nmn women  |[shirt, women]|16.1 |16.1     |ST   |
|3  |shirt were women |[shirt, women]|13.1 |13.1     |ST   |
+---+-----------------+--------------+-----+---------+-----+

Can some one tell me what I am doing wrong here ?

I think something wrong with my window function, when I tried partition against id and raw its not working as well. So both string and array partitions are not working.

dataSet.withColumn("max_score",
      when(col("types").isNull,
        max("score").over(Window.partitionBy("id", "raw")))
        .otherwise($"score")).show(false)

+---+-----------------+--------------+-----+-----+---------+
|id |raw              |filtered      |score|types|max_score|
+---+-----------------+--------------+-----+-----+---------+
|3  |shirt nmn women  |[shirt, women]|16.1 |ST   |16.1     |
|0  |shirt group women|[group, women]|15.1 |null |15.1     |
|0  |shirt group women|[group, women]|12.1 |null |15.1     |
|3  |shirt were women |[shirt, women]|13.1 |ST   |13.1     |
|1  |shirt for women  |[shirt, women]|19.1 |ST   |19.1     |
|1  |shirt for women  |[shirt, women]|10.1 |null |19.1     |
|1  |shirt for women  |[shirt, women]|12.1 |null |19.1     |
+---+-----------------+--------------+-----+-----+---------+

Upvotes: 0

Views: 778

Answers (1)

Shaido
Shaido

Reputation: 28322

You do not need to have a window function inside a when expression, instead this can be done in two stages. First add the max score as a new column to each group based on the id, filtered and the types columns. This will give a max score specifically for the groups where types are null. A window expression is preferred for this since the other columns should be kept.

After this, a check with when/otherwise can be done to change the value of the types column as long as types have a null value and the max score is not equal to score.

In code:

val w = Window.partitionBy("id", "filtered", "types")
val df = data.withColumn("max_score", max($"score").over(w))
  .withColumn("types", when($"types".isNull && $"score" =!= $"max_score", "NA").otherwise($"types"))
  .drop("max_score")

Upvotes: 2

Related Questions