Reputation: 117
I have two Dataframes df1 (Employee table) & df2 (Department table) with following schema :
df1.columns
// Arrays(id,name,dept_id)
and
df2.columns
// Array(id,name)
After i join these two tables on df1.dept_id and df2.id :
val joinedData = df1.join(df2,df1("dept_id")===df2("id"))
joinedData.columns
// Array(id,name,dept_id,id,name)
While saving it in file,
joined.write.csv("<path>")
it gives error :
org.apache.spark.sql.AnalysisException: Duplicate column(s) : "name", "id" found, cannot save to file.;
I read about using Sequence of Strings to avoid column duplication but that is for columns on which join is to be performed. I need a similar functionality for non-joined columns.
Is there a direct way to embed table name with repeated column so that it can be saved ?
I came up with a solution of matching columns of both dfs and renaming duplicate columns to append table-name to column-name. But is there a direct way ?
Note : This will be a generic code with only column details on which join is performed. Rest columns will be known at runtime only. So we can't rename columns by hard-coding it.
Upvotes: 2
Views: 5060
Reputation: 5880
You could try using alias for dataframe,
import spark.implicits._
df1.as("df1")
.join(df2.alias("df2"),df1("dept_id") === df2("id"))
.select($"df1.*",$"df2.*").show()
Upvotes: 1
Reputation: 11489
val llist = Seq(("bob", "2015-01-13", 4), ("alice", "2015-04-23",10))
val left = llist.toDF("name","date","duration")
val right = Seq(("alice", 100),("bob", 23)).toDF("name","upload")
val df = left.join(right, left.col("name") === right.col("name"))
display(df)
head(drop(join(left, right, left$name == right$name), left$name))
https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html
Upvotes: 0
Reputation: 117
After further research and getting views of other developers, it's sure that there is no direct way. One way is to change all column's name as specified by @Raphael. But i solved my problem by changing only duplicate columns :
val commonCols = df1.columns.intersect(df2.columns)
val newDf2 = changeColumnsName(df2,commonCols,"df1")
where changeColumnsName definition is :
@tailrec
def changeColumnsName(dataFrame: DataFrame, columns: Array[String], tableName: String): DataFrame = {
if (columns.size == 0)
dataFrame
else
changeColumnsName(dataFrame.withColumnRenamed(columns.head, tableName + "_" + columns.head), columns.tail, tableName)
}
Now, performing join :
val joinedData = df1.join(newDf2,df1("dept_id")===newDf2("df2_id"))
joinedData.columns
// Array(id,name,dept_id,df2_id,df2_name)
Upvotes: 1
Reputation: 27383
I would just keep all columns by making sure they have different names, e.g. by prepending an identifier to the column names:
val df1Cols = df1.columns
val df2Cols = df2.columns
// prefixes to column names
val df1pf = df1.select(df1Cols.map(n => col(n).as("df1_"+n)):_*)
val df2pf = df2.select(df2Cols.map(n => col(n).as("df2_"+n)):_*)
df1pf.join(df2pf,
$"df1_dept_id"===$"df2_id",
)
Upvotes: 6