Reputation: 1613
Hi I have 2 spark dataframes.
The first one:
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update| uid|segment_comp_11|cluster_comp_170|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
| 2| 2| IT| 41.884| 13.5204| 2019-04-15|d@rNdBkkN-p3| 10| 3|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-15|Ie2Bbs9PUR8h| 15| 4|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-15|Jk2Bbs9PUR8h| 15| 4|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
and second one:
+---------------+---------------+-------+--------+---------+-----------+------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update| uid|
+---------------+---------------+-------+--------+---------+-----------+------------+
| 4| 17| IT| 40.8413| 14.2008| 2019-04-16|ASBuzjKa6nIB|
| 2| 2| IT| 41.884| 15.5204| 2019-04-16|d@rNdBkkN-p3|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-16|Ie2Bbs9PUR8h|
| 16| 15| IT| 45.5298| 9.03813| 2019-04-15|xyzBbs9PUR8h|
+---------------+---------------+-------+--------+---------+-----------+------------+
apart country,latitude,longitude,last_update and uid, the bottom Df could have different columns added.
The idea is to make a full join by uid, update the common columns and keep the uncommon columns.
How could I accomplish this task?
Thanks.
Upvotes: 0
Views: 71
Reputation: 1613
I found this solution, to avoid the shuffling due to the join.
What do you guys think?
Any improvement or scala shortcuts I can use?
def func_union_name(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
After defining the function above i do:
val upper_col = tableToUpdate.columns.toSet
val bottom_col = miniJoin.columns.toSet
val union_cols = tableToUpdate_col ++ miniJoin_col
upper
.select(func_union_name(tableToUpdate_col, union_cols): _*)
.union(bottom.select(func_union_name(bottom_col, union_cols): _*))
.withColumn("max_lu",max(col("last_update"))
.over(Window.partitionBy(col("uid"))))
.filter(col("last_update").geq(col("max_lu")))
.drop(col("max_lu"))
Upvotes: 0
Reputation: 563
If as you said in the comments, you want always the common columns from the bottom table. You can do a simple join losing common cloums from df1 before the join.
joined_df = df1.drop("some_common_columns").join(df2,Seq("uid"))
This will leave you with joined data with only the common cloums from df1 and the uncommon of both dfs in the new joined_df
Upvotes: 0
Reputation: 10693
Here's the code (you didn't specify, so let's try Scala):
// Your dataframes
val upper = ...
val lower = ...
// Find out the columns
val sharedCols = upper.columns.toSet & lower.columns.toSet
val disjointCols = (upper.columns.toSet | lower.columns.toSet) -- sharedCols
val columns = (sharedCols.map(c => coalesce(lower.col(c), upper.col(c)).as(c)) ++ disjointCols.map(c => col(c))).toList
// Join and project
val joined = upper.join(lower, upper.col("uid") === lower.col("uid"), "full_outer").select(columns:_*)
joined.show
Upvotes: 1