gforce91
gforce91

Reputation: 161

Fill column value based on join in Pyspark dataframe

I have a dataframe using the code

df = sc.parallelize([
    (123, 2345,25,""), (123, 2345,29,"NY"), (123,5422,67,"NY"),(123,9422,67,"NY"),(123,3581,98,"NY"),(231, 4322,77,""),(231,4322,99,"Paris"),(231,8342,45,"Paris")
]).toDF(["userid", "transactiontime","zip","location"])
+------+---------------+---+--------+
|userid|transactiontime|zip|location|
+------+---------------+---+--------+
|   123|           2345| 25|        |
|   123|           2345| 29|      NY|
|   123|           5422| 67|      NY|
|   123|           9422| 67|      NY|
|   123|           3581| 98|      NY|
|   231|           4322| 77|        |
|   231|           4322| 99|   Paris|
|   231|           8342| 45|   Paris|
+------+---------------+---+--------+

I want the output to be like this

+------+---------------+---+--------+
|userid|transactiontime|zip|location|
+------+---------------+---+--------+
|   123|           2345| 25|      NY|
|   123|           2345| 29|      NY|
|   123|           5422| 67|      NY|
|   123|           9422| 67|      NY|
|   123|           3581| 98|      NY|
|   231|           4322| 77|   Paris|
|   231|           4322| 99|   Paris|
|   231|           8342| 45|   Paris|
+------+---------------+---+--------+

I want to join on userid and transactiontime and fill the city column with the non-null values.

I have tried window function like this

w1 = Window.partitionBy('userid', 'transactiontime').orderBy(col('zip'))

df_new = df.withColumn("newlocation", F.last('location').over(w1))
print(df_new.show())

But this didn't work and i've tried self join as well but couldn't work that as well. Any help ??

Upvotes: 0

Views: 430

Answers (1)

vinsce
vinsce

Reputation: 1338

The first and last windowing functions accept an optional ignorenulls parameter which may be helpful in this case. However in your example you actually don't have null values but empty strings, which is different.

w = Window.partitionBy('userid', 'transactiontime')

df_new = df \
    .withColumn("fixedLoc", F.when(F.col("location") == "", None).otherwise(F.col("location"))) \
    .withColumn("newLoc", F.first('fixedLoc', ignorenulls=True).over(w))

In the above solution, a temporary column is used to replace empty strings with null values, then first with ignorenulls is used on the new column.

As an alternative solution you can use the max function that will ignore null values and will prioritize non-empty strings:

df_new = df \
    .withColumn("newLoc", F.max('location').over(w))

Upvotes: 1

Related Questions