Balaji Reddy
Balaji Reddy

Reputation: 5710

Aggregate multiple columns in Spark dataframe

I have issues in solving the following problem. Basically I want to find on which date a particular item(item_code) was sold maximum and minimum volume.

Input DataFrame

item_code, sold_date, price, volume
101,      10-12-2017, 20,    500
101,      11-12-2017, 20,    400
201,      10-12-2017, 50,    200
201,      13-12-2017, 51,    300

Expected output

Find max and min volume with sold date.I want this solution without using any lambda operations.

enter image description here

df.groupBy("item_code")agg(min("volume"),max("volume"))

the above one will help me to get max and min of volume but I want them along with respective date.

I tried my level best with udf but I could not crack it. any help highly appreciated.

Upvotes: 1

Views: 5492

Answers (2)

Best approach here is to create a new index (ie column) to the Dataframe as a result of concatenation of the columns required for sorting. Implement a smart sorting on String based new index so that the results are still sorted numerically but you carry along the information of Date and actually whatever you need to retrieve as part of the query.

That way there is no need for JOINs.

Upvotes: 0

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

The final output you desire needs complex process. You can use the following process.

Given the input dataframe as

+---------+----------+-----+------+
|item_code|sold_date |price|volume|
+---------+----------+-----+------+
|101      |10-12-2017|20   |500   |
|101      |11-12-2017|20   |400   |
|201      |10-12-2017|50   |200   |
|201      |13-12-2017|51   |300   |
+---------+----------+-----+------+

You can use the following code

import org.apache.spark.sql.functions._
val tempDF = df.groupBy("item_code").agg(min("volume").as("min"),max("volume").as("max"))
tempDF.as("t2").join(df.as("t1"), col("t1.item_code") === col("t2.item_code") && col("t1.volume") === col("t2.min"), "left")
  .select($"t2.item_code", $"t2.max", concat_ws(",", $"t2.item_code", $"t2.min", $"t1.sold_date").as("min"))
  .join(df.as("t3"), col("t3.item_code") === col("t2.item_code") && col("t3.volume") === col("t2.max"), "left")
  .select($"min", concat_ws(",", $"t3.item_code", $"t2.max", $"t3.sold_date").as("max"))
  .show(false)

which is going to give you the dataframe you desire

+------------------+------------------+
|min               |max               |
+------------------+------------------+
|101,400,11-12-2017|101,500,10-12-2017|
|201,200,10-12-2017|201,300,13-12-2017|
+------------------+------------------+

Upvotes: 2

Related Questions