User12345
User12345

Reputation: 5480

overwrite column values using other column values based on conditions pyspark

I have a data frame in pyspark like below.

df.show()

+-----------+------------+-------------+
|customer_id|product_name|      country|
+-----------+------------+-------------+
|   12870946|        null|       Poland|
|     815518|       MA401|United States|
|    3138420|     WG111v2|           UK|
|    3178864|    WGR614v6|United States|
|    7456796|       XE102|United States|
|   21893468|     AGM731F|United States|
+-----------+------------+-------------+

I have another data frame like below df1.show()

+-----------+------------+
|customer_id|product_name|
+-----------+------------+
|   12870946|     GS748TS|
|     815518|       MA402|
|    3138420|        null|
|    3178864|    WGR614v6|
|    7456796|       XE102|
|   21893468|     AGM731F|
|       null|       AE171|
+-----------+------------+

Now I want to do a fuller outer join on these tables and update product_name column values like below.

1) Overwrite the values in `df` using values in `df1` if there are values in `df1`.
2) if there are `null` values or `no` values in `df1` then leave the values in `df` as they are 

expected result

+-----------+------------+-------------+
|customer_id|product_name|      country|
+-----------+------------+-------------+
|   12870946|     GS748TS|       Poland|
|     815518|       MA402|United States|
|    3138420|     WG111v2|           UK|
|    3178864|    WGR614v6|United States|
|    7456796|       XE102|United States|
|   21893468|     AGM731F|United States|
|       null|       AE171|         null|
+-----------+------------+-------------+

I have done like below

import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer').select(df.customer_id, f.coalesce(df.product_name, df1.product_name).alias('product_name'), df.country)

But the result I am getting is different

df2.show()

+-----------+------------+-------------+
|customer_id|product_name|      country|
+-----------+------------+-------------+
|   12870946|        null|       Poland|
|     815518|       MA401|United States|
|    3138420|     WG111v2|           UK|
|    3178864|    WGR614v6|United States|
|    7456796|       XE102|United States|
|   21893468|     AGM731F|United States|
|       null|       AE171|         null|
+-----------+------------+-------------+

How can I get the expected result

Upvotes: 6

Views: 3329

Answers (3)

AiDev
AiDev

Reputation: 1234

Since there are some conflicting reports - first just create a new column in df1 with the column from df2 you want to use, assuming you df's are of the same dimensions, or join them as necessary if not.Then you can use SQL conditionals.

from pyspark.sql import functions as F
df1 = df1.withColumn('column', F.when(df1['column'].isNull(), df1['column']).otherwise(df1['other-column-originally-from-df2']) )

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Your code is perfect if the values are not string null. But looking at the df2 dataframe you are getting the values in product_name seem to be string null. You will have to check for string null using when inbuilt function and isnull inbuilt funtion as

import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
    .select(df.customer_id, f.when(f.isnull(df.product_name) | (df.product_name == "null"), df1.product_name).otherwise(df.product_name).alias('product_name'), df.country)
df2.show(truncate=False)

which should give you

+-----------+------------+------------+
|customer_id|product_name|country     |
+-----------+------------+------------+
|7456796    |XE102       |UnitedStates|
|3178864    |WGR614v6    |UnitedStates|
|815518     |MA401       |UnitedStates|
|3138420    |WG111v2     |UK          |
|12870946   |GS748TS     |Poland      |
|21893468   |AGM731F     |UnitedStates|
|null       |AE171       |null        |
+-----------+------------+------------+

Upvotes: 3

pault
pault

Reputation: 43494

The code you wrote produces the correct output for me, so I can not reproduce your problem. I have seen other posts where using an alias when doing joins has resolved issues so here is a slightly modified version of your code which will do the same thing:

import pyspark.sql.functions as f

df.alias("r").join(df1.alias("l"), on="customer_id", how='full_outer')\
    .select(
        "customer_id",
        f.coalesce("r.product_name", "l.product_name").alias('product_name'),
        "country"
    )\
    .show()
#+-----------+------------+-------------+
#|customer_id|product_name|      country|
#+-----------+------------+-------------+
#|    7456796|       XE102|United States|
#|    3178864|    WGR614v6|United States|
#|       null|       AE171|         null|
#|     815518|       MA401|United States|
#|    3138420|     WG111v2|           UK|
#|   12870946|     GS748TS|       Poland|
#|   21893468|     AGM731F|United States|
#+-----------+------------+-------------+

I get the same results when I run your code as well (reproduced below):

df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
    .select(
        df.customer_id,
        f.coalesce(df.product_name, df1.product_name).alias('product_name'),
        df.country
    )\
    .show()

I am using spark 2.1 and python 2.7.13.

Upvotes: 6

Related Questions