Arvinth
Arvinth

Reputation: 70

Concatenate and replace strings after groupBy in spark dataframe

I am having below dataframe df, I need to groupby based on constraint column and concatenate status column values.

   +--------------------+-----_-+                                          
   |          constraint|status |
   +--------------------+-------+
   |Test1               |Success|
   |Test1               |Success|
   |Test2               |Failure|
   |Test2               |Success|
   |Test3               |Success|
   |Test3               |Success|
   |Test4               |Failure|
   |Test4               |Success|
   +--------------------+-------+

Expected Output:

I tried below to groupBy constraint column. Then remove duplicate words and replace 'Success Failure' to Failure

Please let me know for optimised way of getting expected output.

Code:

val result_group = df.groupBy(col1 = "constraint").agg(concat_ws(" ", 
collect_list("constraint_status")) as "combined_status")
val distinct :String => String = _.split(" ").toSet.mkString(",")
val validation_status = udf (distinct)
result_group.select('constraint,validation_status('combined_status).as("distinct")).show

Upvotes: 1

Views: 535

Answers (2)

Yaroslav Fyodorov
Yaroslav Fyodorov

Reputation: 739

Another solution, without conversions

val result_group = df.groupBy(col1 = "constraint").
                   agg(collect_set($"constraint_status")) as "combined_status")).
                   withColumn("resulting_status",when(array_contains($"combined_status",lit("Failure")),"Failure").otherwise("Success"))

collect_set will aggregate to array (sic!) of distinct values, so it will be either ["Success"], or ["Failure"], or ["Success","Failure"], array_contains will then check if "Failure" is present.

Or you can do array_sort and take the first value (relying on "Failure" coming before "Success"). Bit it seems less clear

Upvotes: 1

Ranvir Mohanlal
Ranvir Mohanlal

Reputation: 167

This should work efficiently:

val df = spark
.read
.option("header","true")
.option("delimiter","|")
.csv("tmp.csv")
.withColumn("status_code", when($"status"==="Success", 0)
.otherwise(1))
.groupBy("constraint")
.agg(max("status_code").as("status_code"))
.withColumn("status", when($"status_code"===0, "Success")
.otherwise("Failure"))

Output:

df.show()
+----------+-----------+-------+
|constraint|status_code| status|
+----------+-----------+-------+
|     Test2|          1|Failure|
|     Test1|          0|Success|
|     Test3|          1|Failure|
+----------+-----------+-------+

Format of the input csv I used:

constraint|status
Test1|Success
Test1|Success
Test1|Success
Test1|Success
Test2|Success
Test2|Success
Test2|Failure
Test2|Success
Test3|Success
Test3|Failure

Upvotes: 4

Related Questions