Reputation: 5480
I have a data frame
in pyspark
like below.
df.show()
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| null| Poland|
| 815518| MA401|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
+-----------+------------+-------------+
I have another data frame like below
df1.show()
+-----------+------------+
|customer_id|product_name|
+-----------+------------+
| 12870946| GS748TS|
| 815518| MA402|
| 3138420| null|
| 3178864| WGR614v6|
| 7456796| XE102|
| 21893468| AGM731F|
| null| AE171|
+-----------+------------+
Now I want to do a fuller outer join
on these tables and update product_name
column values like below.
1) Overwrite the values in `df` using values in `df1` if there are values in `df1`.
2) if there are `null` values or `no` values in `df1` then leave the values in `df` as they are
expected result
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| GS748TS| Poland|
| 815518| MA402|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
| null| AE171| null|
+-----------+------------+-------------+
I have done like below
import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer').select(df.customer_id, f.coalesce(df.product_name, df1.product_name).alias('product_name'), df.country)
But the result I am getting is different
df2.show()
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| null| Poland|
| 815518| MA401|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
| null| AE171| null|
+-----------+------------+-------------+
How can I get the expected result
Upvotes: 6
Views: 3329
Reputation: 1234
Since there are some conflicting reports - first just create a new column in df1 with the column from df2 you want to use, assuming you df's are of the same dimensions, or join them as necessary if not.Then you can use SQL conditionals.
from pyspark.sql import functions as F
df1 = df1.withColumn('column', F.when(df1['column'].isNull(), df1['column']).otherwise(df1['other-column-originally-from-df2']) )
Upvotes: 1
Reputation: 41957
Your code is perfect if the values are not string null. But looking at the df2 dataframe you are getting the values in product_name
seem to be string null. You will have to check for string null using when
inbuilt function and isnull
inbuilt funtion as
import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
.select(df.customer_id, f.when(f.isnull(df.product_name) | (df.product_name == "null"), df1.product_name).otherwise(df.product_name).alias('product_name'), df.country)
df2.show(truncate=False)
which should give you
+-----------+------------+------------+
|customer_id|product_name|country |
+-----------+------------+------------+
|7456796 |XE102 |UnitedStates|
|3178864 |WGR614v6 |UnitedStates|
|815518 |MA401 |UnitedStates|
|3138420 |WG111v2 |UK |
|12870946 |GS748TS |Poland |
|21893468 |AGM731F |UnitedStates|
|null |AE171 |null |
+-----------+------------+------------+
Upvotes: 3
Reputation: 43494
The code you wrote produces the correct output for me, so I can not reproduce your problem. I have seen other posts where using an alias when doing joins has resolved issues so here is a slightly modified version of your code which will do the same thing:
import pyspark.sql.functions as f
df.alias("r").join(df1.alias("l"), on="customer_id", how='full_outer')\
.select(
"customer_id",
f.coalesce("r.product_name", "l.product_name").alias('product_name'),
"country"
)\
.show()
#+-----------+------------+-------------+
#|customer_id|product_name| country|
#+-----------+------------+-------------+
#| 7456796| XE102|United States|
#| 3178864| WGR614v6|United States|
#| null| AE171| null|
#| 815518| MA401|United States|
#| 3138420| WG111v2| UK|
#| 12870946| GS748TS| Poland|
#| 21893468| AGM731F|United States|
#+-----------+------------+-------------+
I get the same results when I run your code as well (reproduced below):
df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
.select(
df.customer_id,
f.coalesce(df.product_name, df1.product_name).alias('product_name'),
df.country
)\
.show()
I am using spark 2.1 and python 2.7.13.
Upvotes: 6