Reputation: 2241
So as I know in Spark Dataframe, that for multiple columns can have the same name as shown in below dataframe snapshot:
[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]
Above result is created by join with a dataframe to itself, you can see there are 4
columns with both two a
and f
.
The problem is is there when I try to do more calculation with the a
column, I cant find a way to select the a
, I have try df[0]
and df.select('a')
, both returned me below error mesaage:
AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.
Is there anyway in Spark API that I can distinguish the columns from the duplicated names again? or maybe some way to let me change the column names?
Upvotes: 159
Views: 365519
Reputation: 2095
Definitely late to the party but the below approach works for me with any number of same column names during dataframe join. The only way to avoid your issue is renaming the columns in any one of the df.
I am using lambda to rename the columns.
dataframe:df1
+-------+-----+
| a | f |
+-------+-----+
|107831 | xyz |
|107831 | abc |
+-------+-----+
dataframe:df2
+-------+-----+
| a | f |
+-------+-----+
|107831 | efg |
|107831 | jkl |
+-------+-----+
df_new = reduce(lambda df, col: df.withColumnRenamed(col, col+'_new'), df1.columns, df1)
df_new.printSchema()
root
|-- a_new: string (nullable = true)
|-- f_new: string (nullable = true)
Now, both dfs can be joined that will have different column names
df_new.join(df2, df_new['a_new'] == df2['a'])
To select only the columns from df_new:
df_new.join(df2, df_new['a_new'] == df2['a']).select(*df_new.columns)
Upvotes: 0
Reputation: 1885
Pyspark 3.2.1 +
I found simple way of doing that in Spark 3.2.1 using toDF
df.show()
+------+------+---------+
|number| word| word|
+------+------+---------+
| 1| apple| banana|
| 2|cherry| pear|
| 3| grape|pineapple|
+------+------+---------+
df = df.toDF(*[val + str(i) for i, val in enumerate(df.columns)])
df.show()
+-------+------+---------+
|number0| word1| word2|
+-------+------+---------+
| 1| apple| banana|
| 2|cherry| pear|
| 3| grape|pineapple|
+-------+------+---------+
Upvotes: 0
Reputation: 43
What worked for me
import databricks.koalas as ks
df1k = df1.to_koalas()
df2k = df2.to_koalas()
df3k = df1k.merge(df2k, on=['col1', 'col2'])
df3 = df3k.to_spark()
All of the columns except for col1 and col2 had "_x" appended to their names if they had come from df1 and "_y" appended if they had come from df2, which is exactly what I needed.
Upvotes: 0
Reputation: 13154
I would recommend that you change the column names for your join
.
df1.select(col("a") as "df1_a", col("f") as "df1_f")
.join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))
The resulting DataFrame
will have schema
(df1_a, df1_f, df2_a, df2_f)
Upvotes: 80
Reputation: 409
if only the key column is the same in both tables then try using the following way (Approach 1):
left. join(right , 'key', 'inner')
rather than below(approach 2):
left. join(right , left.key == right.key, 'inner')
Pros of using approach 1:
Cons of using approach 1:
Upvotes: 6
Reputation: 330413
Lets start with some data:
from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row
df1 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=125231, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])
df2 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])
There are a few ways you can approach this problem. First of all you can unambiguously reference child table columns using parent columns:
df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
You can also use table aliases:
from pyspark.sql.functions import col
df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")
df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
Finally you can programmatically rename columns:
df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))
df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)
## +--------------------+
## | f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
Upvotes: 166
Reputation: 245
If you have a more complicated use case than described in the answer of Glennie Helles Sindholt e.g. you have other/few non-join column names that are also same and want to distinguish them while selecting it's best to use aliasses, e.g:
df3 = df1.select("a", "b").alias("left")\
.join(df2.select("a", "b").alias("right"), ["a"])\
.select("left.a", "left.b", "right.b")
df3.columns
['a', 'b', 'b']
Upvotes: 3
Reputation: 110
This might not be the best approach, but if you want to rename the duplicate columns(after join), you can do so using this tiny function.
def rename_duplicate_columns(dataframe):
columns = dataframe.columns
duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
for index in duplicate_column_indices:
columns[index] = columns[index]+'2'
dataframe = dataframe.toDF(*columns)
return dataframe
Upvotes: 3
Reputation: 2621
There is a simpler way than writing aliases for all of the columns you are joining on by doing:
df1.join(df2,['a'])
This works if the key that you are joining on is the same in both tables.
See https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html
Upvotes: 83
Reputation: 2241
After digging into the Spark API, I found I can first use alias
to create an alias for the original dataframe, then I use withColumnRenamed
to manually rename every column on the alias, this will do the join
without causing the column name duplication.
More detail can be refer to below Spark Dataframe API:
pyspark.sql.DataFrame.withColumnRenamed
However, I think this is only a troublesome workaround, and wondering if there is any better way for my question.
Upvotes: 6
Reputation: 1051
This is how we can join two Dataframes on same column names in PySpark.
df = df1.join(df2, ['col1','col2','col3'])
If you do printSchema()
after this then you can see that duplicate columns have been removed.
Upvotes: 16
Reputation: 91
Suppose the DataFrames you want to join are df1 and df2, and you are joining them on column 'a', then you have 2 methods
Method 1
df1.join(df2,'a','left_outer')
This is an awsome method and it is highly recommended.
Method 2
df1.join(df2,df1.a == df2.a,'left_outer').drop(df2.a)
Upvotes: 9
Reputation: 772
You can use def drop(col: Column)
method to drop the duplicated column,for example:
DataFrame:df1
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
DataFrame:df2
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
when I join df1 with df2, the DataFrame will be like below:
val newDf = df1.join(df2,df1("a")===df2("a"))
DataFrame:newDf
+-------+-----+-------+-----+
| a | f | a | f |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+
Now, we can use def drop(col: Column)
method to drop the duplicated column 'a' or 'f', just like as follows:
val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
Upvotes: 12