vamsi
vamsi

Reputation: 354

Spark dataframe case when

I'm learning spark with scala. I'm trying to publish some values into columns using kind of case statement. Any help would be appreciated.

In input DF I have columns customer, order, type, message, message1, message2. message1 & message2 will always null in input DF. I want to publish message in message1 when type is 'V' and publish message in message 2 when type is 'A'. In the output DF I should have only one record for a customer.

DF1: 
cust, order, type, message, message1, message2
c1, o1, V, Verified, null, null
c1, o1, A, Approved, null, null
c2, o2, A, Approved, null, null
c3, o3, V, Verified, null, null

outputDF:
cust, order, type, message, message1, message2
c1, o1, A, Approved, Verified, Approved
c2, o2, A, Approved, null, Approved
c3, o3, V, Verified, Verified, null

Upvotes: 0

Views: 1523

Answers (2)

wypul
wypul

Reputation: 837

As suggested in other answers you can use when/otherwise clause for inserting message1 and message2 value based on type. But to satisfy last condition ie, only one row per customer you can do something like below:

val df = Seq(("c1", "o1", "V", "Verified", null, null),("c1", "o1", "A", "Approved", null, null), ("c2", "o2", "A", "Approved", null, null), ("c3", "o3", "V", "Verified", null, null)).toDF("cust", "order", "type", "message", "message1", "message2")

val outputDf = df.groupBy($"cust",$"order").agg(collect_list($"type").alias("type"),collect_list($"message").alias("message")).withColumn("message1", when(size($"type")===2,"Verified").when($"type"(0)==="V",$"message"(0))).withColumn("message2", when(size($"type")===2,"Approved").when($"type"(0)==="A",$"message"(0))).withColumn("message", when(size($"type")===2,lit("Approved")).otherwise($"message"(0))).withColumn("type",when(size($"type")===2,"A").otherwise($"type"(0)))

outputDf.show

which gives below output:

+----+-----+----+--------+--------+--------+
|cust|order|type| message|message1|message2|
+----+-----+----+--------+--------+--------+
|  c2|   o2|   A|Approved|    null|Approved|
|  c1|   o1|   A|Approved|Verified|Approved|
|  c3|   o3|   V|Verified|Verified|    null|
+----+-----+----+--------+--------+--------+

Upvotes: 1

Abhishek
Abhishek

Reputation: 462

If message1 and message2 are just nulls, I would just create new columns using when/otherwise. If message1 and message2 do contain other values and you want to retain them, you modify the below example a bit and use existing message1 and message2 columns in the otherwise parameter.

  import spark.implicits._
  import org.apache.spark.sql.functions.when
  val inputDF = spark.createDataFrame(Seq(
    ("c1", "o1", "V", "Verified", "null", "null"),
    ("c1", "o1", "A", "Approved", "null", "null"),
    ("c2", "o2", "A", "Approved", "null", "null"),
    ("c3", "o3", "V", "Verified", "null", "null")
  )).toDF("customer", "order", "type", "message", "message1", "message2")

  val newInputDF = inputDF.select("customer", "order", "type", "message")
  val outputDF = newInputDF
    .withColumn("message1", when($"type" === "V", $"message").otherwise("null"))
    .withColumn("message2", when($"type" === "A", $"message").otherwise("null"))
  outputDF.show()

Upvotes: 1

Related Questions