Reputation: 73
I am trying to join two dataframe in pyspark as below:
df1 :
+----------+----------+--------------------+-----+
|FIRST_NAME| LAST_NAME| COMPANY_NAME|CCODE|
+----------+----------+--------------------+-----+
| Rebbecca| Didio|Brandt, Jonathan ...| AU|
| Stevie| Hallo|Landrum Temporary...| US|
| Mariko| Stayer| Inabinet, Macre Esq| BR|
| Gerardo| Woodka|Morris Downing & ...| US|
| Mayra| Bena| Buelt, David L Esq| CN|
| Idella| Scotland|Artesian Ice & Co...| UK|
| Sherill| Klar| Midway Hotel| CA|
+----------+----------+--------------------+-----+
DF2:
+--------------------+-----------+
| COUNTRY|COUNTRYCODE|
+--------------------+-----------+
| United Kingdom| UK|
| United States| US|
|United Arab Emirates| AE|
| Canada| CA|
| Brazil| BR|
| India| IN|
+--------------------+-----------+
I am trying to join two dataframe on df1.CCODE == df2.COUNTRYCODE but its not working:
df1 = df1.alias('df1')
df2 = df2.alias('df2')
tgt_tbl_col='COUNTRYCODE'
src_tbl_col='CCODE'
join_type = 'INNER'
merge_df = df1.join(df2, df2.tgt_tbl_col == df1.src_tbl_col, how=join_type)
Error:
AttributeError: 'DataFrame' object has no attribute 'tgt_tbl_col'
/databricks/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
1332 if name not in self.columns:
1333 raise AttributeError(
-> 1334 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
1335 jc = self._jdf.apply(name)
1336 return Column(jc)
However, the same is working when I am making both of the column name same and run the below:
merge_df = df1.join(df2, on=[tgt_tbl_col], how=join_type)
Need suggestion on this.
Version: Apache Spark 2.4.5, Scala 2.11, python 3.8
Upvotes: 1
Views: 4419
Reputation: 1171
please note tgt_tbl_col is not a column name in df2, hence it will throw an error.
Hence you can do as below:
from pyspark.sql.functions import col
merge_df = df1.join(df2, col(tgt_tbl_col) == col(src_tbl_col), how=join_type)
Alternately you can directly write like this:
merge_df = df1.join(df2, df1.CCODE == df2.COUNTRYCODE , "inner")
Note: If you don't specify "inner" in the above statement. It's fine as spark by default considers inner join.
In your further question you said when you are writing as below its working: merge_df = df1.join(df2, on=[tgt_tbl_col], how=join_type)
In your case, you are passing tgt_tbl_col(i.e.'COUNTRYCODE') as a string, it will work only if you rename CCODE column in the df1 table as COUNTRYCODE, else it will throw an error.
Note: If both tables have the same column name on which you want to join, then you can directly pass only the column name as a string instead of mentioning column names of both tables as conditions.
Upvotes: 1
Reputation: 126
Your code is failing because it is trying to take the variable name as property of df1 instead of "CCODE" so df1.tgt_tbl_col is giving error. Try the below code which uses col function which works. If you are fine with renaming the df1 column CCODE to COUNTRYCODE then @Sampath 's answer will be fine
Note: Make sure when giving the condition col1 and col2 should be in order of join i.e df1.join(df2 .... so col1 must be df1 column and col2 must be df2 column
from pyspark.sql import functions as sf
col1="CCODE"
col2="COUNTRYCODE"
condition = sf.col(col1) == sf.col(col2)
join_type = "inner"
df1.join(df2, condition, how=join_type).show()
Upvotes: 0