Reputation: 70
I am having below dataframe df
, I need to groupby based on constraint
column and concatenate status
column values.
+--------------------+-----_-+
| constraint|status |
+--------------------+-------+
|Test1 |Success|
|Test1 |Success|
|Test2 |Failure|
|Test2 |Success|
|Test3 |Success|
|Test3 |Success|
|Test4 |Failure|
|Test4 |Success|
+--------------------+-------+
Expected Output:
Success, Success -> Success
Success, Failure -> Failure
+--------------------+-------+
| constraint|status |
+--------------------+-------+
|Test1 |Success|
|Test2 |Failure|
|Test3 |Success|
|Test4 |Failure|
+--------------------+-------+
I tried below to groupBy constraint
column. Then remove duplicate words and replace 'Success Failure' to Failure
Please let me know for optimised way of getting expected output.
Code:
val result_group = df.groupBy(col1 = "constraint").agg(concat_ws(" ",
collect_list("constraint_status")) as "combined_status")
val distinct :String => String = _.split(" ").toSet.mkString(",")
val validation_status = udf (distinct)
result_group.select('constraint,validation_status('combined_status).as("distinct")).show
Upvotes: 1
Views: 535
Reputation: 739
Another solution, without conversions
val result_group = df.groupBy(col1 = "constraint").
agg(collect_set($"constraint_status")) as "combined_status")).
withColumn("resulting_status",when(array_contains($"combined_status",lit("Failure")),"Failure").otherwise("Success"))
collect_set
will aggregate to array (sic!) of distinct values, so it will be either ["Success"], or ["Failure"], or ["Success","Failure"],
array_contains
will then check if "Failure" is present.
Or you can do array_sort
and take the first value (relying on "Failure" coming before "Success"). Bit it seems less clear
Upvotes: 1
Reputation: 167
This should work efficiently:
val df = spark
.read
.option("header","true")
.option("delimiter","|")
.csv("tmp.csv")
.withColumn("status_code", when($"status"==="Success", 0)
.otherwise(1))
.groupBy("constraint")
.agg(max("status_code").as("status_code"))
.withColumn("status", when($"status_code"===0, "Success")
.otherwise("Failure"))
Output:
df.show()
+----------+-----------+-------+
|constraint|status_code| status|
+----------+-----------+-------+
| Test2| 1|Failure|
| Test1| 0|Success|
| Test3| 1|Failure|
+----------+-----------+-------+
Format of the input csv I used:
constraint|status
Test1|Success
Test1|Success
Test1|Success
Test1|Success
Test2|Success
Test2|Success
Test2|Failure
Test2|Success
Test3|Success
Test3|Failure
Upvotes: 4