Reputation: 265
I have a specific requirement to fill all Values (categories) against a column. For example, as shown in the below table. I want a way to fill the 'UNSEEN' and 'ASSIGNED' category for code HL_14108.
val df = Seq(
("HL_13203","DELIVERED",3226),
("HL_13203","UNSEEN",249),
("HL_13203","UNDELIVERED",210),
("HL_13203","ASSIGNED",2),
("HL_14108","DELIVERED",3083),
("HL_14108","UNDELIVERED",164),
("HL_14108","PICKED",1)).toDF("code","status","count")
Input:
+--------+-----------+-----+
| code| status|count|
+--------+-----------+-----+
|HL_13203| DELIVERED| 3226|
|HL_13203| UNSEEN| 249|
|HL_13203|UNDELIVERED| 210|
|HL_13203| ASSIGNED| 2|
|HL_14108| DELIVERED| 3083|
|HL_14108|UNDELIVERED| 164|
|HL_14108| PICKED| 1|
+--------+-----------+-----+
Expected output:
+--------+-----------+-----+
| code| status|count|
+--------+-----------+-----+
|HL_13203| DELIVERED| 3226|
|HL_13203| UNSEEN| 249|
|HL_13203|UNDELIVERED| 210|
|HL_13203| ASSIGNED| 2|
|HL_13203| PICKED| 0|
|HL_14108| DELIVERED| 3083|
|HL_14108|UNDELIVERED| 164|
|HL_14108| PICKED| 1|
|HL_14108| UNSEEN| 0|
|HL_14108| ASSIGNED| 0|
+--------+-----------+-----+
I want to add the missing category rows for each code. What would be correct approach to do that in Apache spark?
Upvotes: 1
Views: 1271
Reputation: 28422
First create a new dataframe with all possible combinations of the code
and status
columns. This can be done in different ways but the most straightforward is through a cross join:
val states = df.select("status").dropDuplicates()
val codes = df.select("code").dropDuplicates()
val df2 = codes.crossJoin(states)
A better approach would be to first identify all possible states and then use explode
and typedLit
(avaialable from Spark version 2.2+). This will result in the same dataframe:
val states = df.select("status").dropDuplicates().as[String].collect()
val codes = df.select("code").dropDuplicates()
val df2 = codes.withColumn("status", explode(typedLit(states)))
For older Spark versions, the same functionality as typedLit
can be acheived by using array(states.map(lit(_)): _*)
.
Then, join
this new dataframe with the old one to obtain the count
column. Rows with no count
value will be NaN
, so na.fill(0)
is used to set these to 0:
df2.join(df, Seq("code", "status"), "left").na.fill(0)
Resulting dataframe:
+--------+-----------+-----+
| code| status|count|
+--------+-----------+-----+
|HL_13203|UNDELIVERED| 210|
|HL_13203| ASSIGNED| 2|
|HL_13203| UNSEEN| 249|
|HL_13203| PICKED| 0|
|HL_13203| DELIVERED| 3226|
|HL_14108|UNDELIVERED| 164|
|HL_14108| ASSIGNED| 0|
|HL_14108| UNSEEN| 0|
|HL_14108| PICKED| 1|
|HL_14108| DELIVERED| 3083|
+--------+-----------+-----+
Upvotes: 3