Reputation: 354
I'm learning spark with scala. I'm trying to publish some values into columns using kind of case statement. Any help would be appreciated.
In input DF I have columns customer, order, type, message, message1, message2. message1 & message2 will always null in input DF. I want to publish message in message1 when type is 'V' and publish message in message 2 when type is 'A'. In the output DF I should have only one record for a customer.
DF1:
cust, order, type, message, message1, message2
c1, o1, V, Verified, null, null
c1, o1, A, Approved, null, null
c2, o2, A, Approved, null, null
c3, o3, V, Verified, null, null
outputDF:
cust, order, type, message, message1, message2
c1, o1, A, Approved, Verified, Approved
c2, o2, A, Approved, null, Approved
c3, o3, V, Verified, Verified, null
Upvotes: 0
Views: 1523
Reputation: 837
As suggested in other answers you can use when/otherwise clause for inserting message1 and message2 value based on type. But to satisfy last condition ie, only one row per customer you can do something like below:
val df = Seq(("c1", "o1", "V", "Verified", null, null),("c1", "o1", "A", "Approved", null, null), ("c2", "o2", "A", "Approved", null, null), ("c3", "o3", "V", "Verified", null, null)).toDF("cust", "order", "type", "message", "message1", "message2")
val outputDf = df.groupBy($"cust",$"order").agg(collect_list($"type").alias("type"),collect_list($"message").alias("message")).withColumn("message1", when(size($"type")===2,"Verified").when($"type"(0)==="V",$"message"(0))).withColumn("message2", when(size($"type")===2,"Approved").when($"type"(0)==="A",$"message"(0))).withColumn("message", when(size($"type")===2,lit("Approved")).otherwise($"message"(0))).withColumn("type",when(size($"type")===2,"A").otherwise($"type"(0)))
outputDf.show
which gives below output:
+----+-----+----+--------+--------+--------+
|cust|order|type| message|message1|message2|
+----+-----+----+--------+--------+--------+
| c2| o2| A|Approved| null|Approved|
| c1| o1| A|Approved|Verified|Approved|
| c3| o3| V|Verified|Verified| null|
+----+-----+----+--------+--------+--------+
Upvotes: 1
Reputation: 462
If message1
and message2
are just nulls, I would just create new columns using when/otherwise. If message1
and message2
do contain other values and you want to retain them, you modify the below example a bit and use existing message1
and message2
columns in the otherwise
parameter.
import spark.implicits._
import org.apache.spark.sql.functions.when
val inputDF = spark.createDataFrame(Seq(
("c1", "o1", "V", "Verified", "null", "null"),
("c1", "o1", "A", "Approved", "null", "null"),
("c2", "o2", "A", "Approved", "null", "null"),
("c3", "o3", "V", "Verified", "null", "null")
)).toDF("customer", "order", "type", "message", "message1", "message2")
val newInputDF = inputDF.select("customer", "order", "type", "message")
val outputDF = newInputDF
.withColumn("message1", when($"type" === "V", $"message").otherwise("null"))
.withColumn("message2", when($"type" === "A", $"message").otherwise("null"))
outputDF.show()
Upvotes: 1