Conditional replacement of values in pyspark dataframe

I have the spark dataframe below:

+----------+-------------+--------------+------------+----------+-------------------+
|      part|      company|       country|        city|     price|               date|
+----------+-------------+--------------+------------+----------+-------------------+
| 52125-136|  Brainsphere|          null|       Braga|   493.94€|2016-05-10 11:13:43|
| 70253-307|Chatterbridge|         Spain|   Barcelona|   969.29€|2016-05-10 13:06:30|
| 50563-113|     Kanoodle|         Japan|     Niihama| ¥72909.95|2016-05-10 13:11:57|
|52380-1102|    Flipstorm|        France|    Nanterre|   794.84€|2016-05-10 13:19:12|
| 54473-578|  Twitterbeat|        France|      Annecy|   167.48€|2016-05-10 15:09:46|
| 76335-006|        Ntags|      Portugal|      Lisbon|   373.07€|2016-05-10 15:20:22|
| 49999-737|     Buzzbean|       Germany|  Düsseldorf|    861.2€|2016-05-10 15:21:51|
| 68233-011|    Flipstorm|        Greece|      Athens|   512.89€|2016-05-10 15:22:03|
| 36800-952|       Eimbee|        France|      Amiens|   219.74€|2016-05-10 21:22:46|
| 16714-295|      Teklist|          null|      Arnhem|    624.4€|2016-05-10 21:57:15|
| 42254-213|   Thoughtmix|      Portugal|     Amadora|   257.99€|2016-05-10 22:01:04|

From these columns, only the country column has null values. So what I want to do is to fill the null values with the country that corresponds to the city on the right. The dataframe is big and there are cases where Braga (for example) has the country that it belongs and other cases where this is not the case.

So, how can I fill those null values in the country column based on the city column on the right and at the same time take advantage of Spark's parallel computation?

Upvotes: 0

Views: 620

Answers (2)

Steven
Steven

Reputation: 15318

You can use a window functions for that.

from pyspark.sql import functions as F, Window


df.withColumn(
    "country",
    F.coalesce(
        F.col("country"),
        F.first("country").over(Window.partitionBy("city").orderBy("city")),
    ),
).show()

Upvotes: 1

notNull
notNull

Reputation: 31540

Use coalesce function in spark to get first non null value from list of columns.

Example:

df.show()
#+--------+---------+
#| country|     city|
#+--------+---------+
#|    null|    Braga|
#|   Spain|Barcelona|
#|    null|   Arnhem|
#|portugal|  Amadora|
#+--------+---------+

from pyspark.sql.functions import *

df.withColumn("country",coalesce(col("country"),col("city"))).show()
#+--------+---------+
#| country|     city|
#+--------+---------+
#|   Braga|    Braga|
#|   Spain|Barcelona|
#|  Arnhem|   Arnhem|
#|portugal|  Amadora|
#+--------+---------+

Upvotes: 1

Related Questions