Lalit Lakhotia
Lalit Lakhotia

Reputation: 265

Filling missing values in rows using Apache spark

I have a specific requirement to fill all Values (categories) against a column. For example, as shown in the below table. I want a way to fill the 'UNSEEN' and 'ASSIGNED' category for code HL_14108.

val df = Seq(
("HL_13203","DELIVERED",3226), 
("HL_13203","UNSEEN",249),     
("HL_13203","UNDELIVERED",210),
("HL_13203","ASSIGNED",2),    
("HL_14108","DELIVERED",3083), 
("HL_14108","UNDELIVERED",164),
("HL_14108","PICKED",1)).toDF("code","status","count")

Input:

+--------+-----------+-----+
|    code|     status|count|
+--------+-----------+-----+
|HL_13203|  DELIVERED| 3226|
|HL_13203|     UNSEEN|  249|
|HL_13203|UNDELIVERED|  210|
|HL_13203|   ASSIGNED|    2|
|HL_14108|  DELIVERED| 3083|
|HL_14108|UNDELIVERED|  164|
|HL_14108|     PICKED|    1|
+--------+-----------+-----+

Expected output:

+--------+-----------+-----+
|    code|     status|count|
+--------+-----------+-----+
|HL_13203|  DELIVERED| 3226|
|HL_13203|     UNSEEN|  249|
|HL_13203|UNDELIVERED|  210|
|HL_13203|   ASSIGNED|    2|
|HL_13203|     PICKED|    0|
|HL_14108|  DELIVERED| 3083|
|HL_14108|UNDELIVERED|  164|
|HL_14108|     PICKED|    1|
|HL_14108|     UNSEEN|    0|
|HL_14108|   ASSIGNED|    0|
+--------+-----------+-----+

I want to add the missing category rows for each code. What would be correct approach to do that in Apache spark?

Upvotes: 1

Views: 1271

Answers (1)

Shaido
Shaido

Reputation: 28422

First create a new dataframe with all possible combinations of the code and status columns. This can be done in different ways but the most straightforward is through a cross join:

val states = df.select("status").dropDuplicates()
val codes = df.select("code").dropDuplicates()
val df2 = codes.crossJoin(states)

A better approach would be to first identify all possible states and then use explode and typedLit (avaialable from Spark version 2.2+). This will result in the same dataframe:

val states = df.select("status").dropDuplicates().as[String].collect()
val codes = df.select("code").dropDuplicates()
val df2 = codes.withColumn("status", explode(typedLit(states)))

For older Spark versions, the same functionality as typedLit can be acheived by using array(states.map(lit(_)): _*).


Then, join this new dataframe with the old one to obtain the count column. Rows with no count value will be NaN, so na.fill(0) is used to set these to 0:

df2.join(df, Seq("code", "status"), "left").na.fill(0)

Resulting dataframe:

+--------+-----------+-----+
|    code|     status|count|
+--------+-----------+-----+
|HL_13203|UNDELIVERED|  210|
|HL_13203|   ASSIGNED|    2|
|HL_13203|     UNSEEN|  249|
|HL_13203|     PICKED|    0|
|HL_13203|  DELIVERED| 3226|
|HL_14108|UNDELIVERED|  164|
|HL_14108|   ASSIGNED|    0|
|HL_14108|     UNSEEN|    0|
|HL_14108|     PICKED|    1|
|HL_14108|  DELIVERED| 3083|
+--------+-----------+-----+

Upvotes: 3

Related Questions