eemilk
eemilk

Reputation: 1628

Spark Create new column containing min and max of corresponding values in certain other column

Let's say I have a column

    import spark.implicits._
    
    val simpleData = Seq(("James", "Sales", 3000),
        ("Michael", "Sales", 4600),
        ("Robert", "Sales", 4100),
        ("Maria", "Finance", 3000),
        ("James", "Sales", 3000),
        ("Scott", "Finance", 3300),
        ("Jen", "Finance", 3900),
        ("Jeff", "Marketing", 3000),
        ("Kumar", "Marketing", 2000),
        ("Saif", "Sales", 4100))
        
    val df_1 = simpleData.toDF("employee_name", "department", "salary")
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+

I an ideal case I would like to add columns min_department_salary max_department_salary min_salary_employee_name and max_salary_employee_name to that original dataframe. These would tell for every row for what is min and max salary, who is getting it.

So first row would be James, Sales, 3000, 3000, 4600, James, Michael

What I have now is

    val df_1_5 = df_1.groupBy('department)
                    .agg(min('salary).as("min_department_salary"), max('salary).as("max_department_salary"))
+----------+---------------------+---------------------+
|department|min_department_salary|max_department_salary|
+----------+---------------------+---------------------+
|     Sales|                 3000|                 4600|
|   Finance|                 3000|                 3900|
| Marketing|                 2000|                 3000|
+----------+---------------------+---------------------+

This is not quite there yet and I have tried join in there with the original df. I would like to avoid joins as I have a quite big dataframe.

Upvotes: 2

Views: 1039

Answers (2)

Arun kumar
Arun kumar

Reputation: 11

Use Window aggregate functions than regular aggregate functions.

val df2 = df1.sort($"department",$"salary")
                 .withColumn("min_department_salary",min("salary") over Window.partitionBy($"department"))
                 .withColumn("max_department_salary",max("salary") over Window.partitionBy($"department"))
                 .withColumn("min_salary_employee_name",first("employee_name") over Window.partitionBy($"department"))
                 .withColumn("max_salary_employee_name",last("employee_name") over Window.partitionBy($"department"))
                 .select("employee_name", "department", "salary",
                         "min_department_salary","max_department_salary",
                         "min_salary_employee_name","max_salary_employee_name")

Upvotes: 1

koiralo
koiralo

Reputation: 23109

You can use struct to preserve the other column as below

df1.withColumn("sal-name", struct($"salary", $"employee_name"))
  .groupBy('department)
  .agg(min("sal-name").as("min"), max("sal-name").as("max"))
  .select($"department", $"min.*", $"max.*")
  .toDF("department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
  .show(false)

Output:

+----------+-------+------------+-------+------------+
|department|min_sal|min_sal_name|max_sal|min_sal_name|
+----------+-------+------------+-------+------------+
|Sales     |3000   |James       |4600   |Michael     |
|Finance   |3000   |Maria       |3900   |Jen         |
|Marketing |2000   |Kumar       |3000   |Jeff        |
+----------+-------+------------+-------+------------+

If you want all the rows then you can use window function instead of groupBy

val window = Window.partitionBy("department")

df1.withColumn("sal-name", struct($"salary", $"employee_name"))
  .withColumn("min", min("sal-name").over(window))
  .withColumn("max", max("sal-name").over(window))
  .select($"employee_name", $"department", $"min.*", $"max.*")
  .toDF("employee_name" ,"department", "min_sal", "min_sal_name", "max_sal", "min_sal_name")
  .show(false)

Output:

+-------------+----------+-------+------------+-------+------------+
|employee_name|department|min_sal|min_sal_name|max_sal|min_sal_name|
+-------------+----------+-------+------------+-------+------------+
|James        |Sales     |3000   |James       |4600   |Michael     |
|Michael      |Sales     |3000   |James       |4600   |Michael     |
|Robert       |Sales     |3000   |James       |4600   |Michael     |
|James        |Sales     |3000   |James       |4600   |Michael     |
|Saif         |Sales     |3000   |James       |4600   |Michael     |
|Maria        |Finance   |3000   |Maria       |3900   |Jen         |
|Scott        |Finance   |3000   |Maria       |3900   |Jen         |
|Jen          |Finance   |3000   |Maria       |3900   |Jen         |
|Jeff         |Marketing |2000   |Kumar       |3000   |Jeff        |
|Kumar        |Marketing |2000   |Kumar       |3000   |Jeff        |
+-------------+----------+-------+------------+-------+------------+

Upvotes: 3

Related Questions