Arijit Mondal
Arijit Mondal

Reputation: 73

Dataframe join not working in spark 2.4.5

I am trying to join two dataframe in pyspark as below:

df1 : 
+----------+----------+--------------------+-----+
|FIRST_NAME| LAST_NAME|        COMPANY_NAME|CCODE|
+----------+----------+--------------------+-----+
|  Rebbecca|     Didio|Brandt, Jonathan ...|   AU|
|    Stevie|     Hallo|Landrum Temporary...|   US|
|    Mariko|    Stayer| Inabinet, Macre Esq|   BR|
|   Gerardo|    Woodka|Morris Downing & ...|   US|
|     Mayra|      Bena|  Buelt, David L Esq|   CN|
|    Idella|  Scotland|Artesian Ice & Co...|   UK|
|   Sherill|      Klar|        Midway Hotel|   CA|
+----------+----------+--------------------+-----+

DF2:
+--------------------+-----------+
|             COUNTRY|COUNTRYCODE|
+--------------------+-----------+
|      United Kingdom|         UK|
|       United States|         US|
|United Arab Emirates|         AE|
|              Canada|         CA|
|              Brazil|         BR|
|               India|         IN|
+--------------------+-----------+

I am trying to join two dataframe on df1.CCODE == df2.COUNTRYCODE but its not working:

df1 = df1.alias('df1')
df2 = df2.alias('df2')
tgt_tbl_col='COUNTRYCODE'
src_tbl_col='CCODE'
join_type = 'INNER'
merge_df = df1.join(df2, df2.tgt_tbl_col == df1.src_tbl_col, how=join_type)

Error:

AttributeError: 'DataFrame' object has no attribute 'tgt_tbl_col'
/databricks/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
   1332         if name not in self.columns:
   1333             raise AttributeError(
-> 1334                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
   1335         jc = self._jdf.apply(name)
   1336         return Column(jc)

However, the same is working when I am making both of the column name same and run the below:

merge_df = df1.join(df2, on=[tgt_tbl_col], how=join_type)

Need suggestion on this.

Version: Apache Spark 2.4.5, Scala 2.11, python 3.8

Upvotes: 1

Views: 4419

Answers (2)

Karthik
Karthik

Reputation: 1171

please note tgt_tbl_col is not a column name in df2, hence it will throw an error.

Hence you can do as below:

from pyspark.sql.functions import col
merge_df = df1.join(df2, col(tgt_tbl_col) == col(src_tbl_col), how=join_type)

Alternately you can directly write like this:

merge_df = df1.join(df2, df1.CCODE == df2.COUNTRYCODE , "inner")

Note: If you don't specify "inner" in the above statement. It's fine as spark by default considers inner join.

In your further question you said when you are writing as below its working: merge_df = df1.join(df2, on=[tgt_tbl_col], how=join_type)

In your case, you are passing tgt_tbl_col(i.e.'COUNTRYCODE') as a string, it will work only if you rename CCODE column in the df1 table as COUNTRYCODE, else it will throw an error.

Note: If both tables have the same column name on which you want to join, then you can directly pass only the column name as a string instead of mentioning column names of both tables as conditions.

Upvotes: 1

HArdRe537
HArdRe537

Reputation: 126

Your code is failing because it is trying to take the variable name as property of df1 instead of "CCODE" so df1.tgt_tbl_col is giving error. Try the below code which uses col function which works. If you are fine with renaming the df1 column CCODE to COUNTRYCODE then @Sampath 's answer will be fine

Note: Make sure when giving the condition col1 and col2 should be in order of join i.e df1.join(df2 .... so col1 must be df1 column and col2 must be df2 column

from pyspark.sql import functions as sf 

col1="CCODE"
col2="COUNTRYCODE"

condition = sf.col(col1) == sf.col(col2)
join_type = "inner"

df1.join(df2, condition, how=join_type).show()

Upvotes: 0

Related Questions