3nomis
3nomis

Reputation: 1613

Spark Join dataframes and conditionally update columns

Hi I have 2 spark dataframes.
The first one:

+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update|         uid|segment_comp_11|cluster_comp_170|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+
|              2|              2|     IT|  41.884|  13.5204| 2019-04-15|d@rNdBkkN-p3|             10|               3|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|Ie2Bbs9PUR8h|             15|               4|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|Jk2Bbs9PUR8h|             15|               4|
+---------------+---------------+-------+--------+---------+-----------+------------+---------------+----------------+  

and second one:

+---------------+---------------+-------+--------+---------+-----------+------------+
|cluster_socio_6|cluster_socio_7|country|latitude|longitude|last_update|         uid|
+---------------+---------------+-------+--------+---------+-----------+------------+
|              4|             17|     IT| 40.8413|  14.2008| 2019-04-16|ASBuzjKa6nIB|
|              2|              2|     IT|  41.884|  15.5204| 2019-04-16|d@rNdBkkN-p3|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-16|Ie2Bbs9PUR8h|
|             16|             15|     IT| 45.5298|  9.03813| 2019-04-15|xyzBbs9PUR8h|
+---------------+---------------+-------+--------+---------+-----------+------------+  

apart country,latitude,longitude,last_update and uid, the bottom Df could have different columns added.
The idea is to make a full join by uid, update the common columns and keep the uncommon columns.
How could I accomplish this task? Thanks.

Upvotes: 0

Views: 71

Answers (3)

3nomis
3nomis

Reputation: 1613

I found this solution, to avoid the shuffling due to the join.
What do you guys think?
Any improvement or scala shortcuts I can use?

    def func_union_name(myCols: Set[String], allCols: Set[String]) = {
        allCols.toList.map(x => x match {
          case x if myCols.contains(x) => col(x)
          case _ => lit(null).as(x)
        })
      }  

After defining the function above i do:

          val upper_col = tableToUpdate.columns.toSet
          val bottom_col = miniJoin.columns.toSet
          val union_cols = tableToUpdate_col ++ miniJoin_col

              upper
                .select(func_union_name(tableToUpdate_col, union_cols): _*)
                .union(bottom.select(func_union_name(bottom_col, union_cols): _*))            
                .withColumn("max_lu",max(col("last_update"))
                                      .over(Window.partitionBy(col("uid"))))
                .filter(col("last_update").geq(col("max_lu")))
                .drop(col("max_lu"))

Upvotes: 0

RefiPeretz
RefiPeretz

Reputation: 563

If as you said in the comments, you want always the common columns from the bottom table. You can do a simple join losing common cloums from df1 before the join.

joined_df = df1.drop("some_common_columns").join(df2,Seq("uid"))

This will leave you with joined data with only the common cloums from df1 and the uncommon of both dfs in the new joined_df

Upvotes: 0

Kombajn zbożowy
Kombajn zbożowy

Reputation: 10693

Here's the code (you didn't specify, so let's try Scala):

// Your dataframes
val upper = ...
val lower = ...

// Find out the columns
val sharedCols = upper.columns.toSet & lower.columns.toSet
val disjointCols = (upper.columns.toSet | lower.columns.toSet) -- sharedCols
val columns = (sharedCols.map(c => coalesce(lower.col(c), upper.col(c)).as(c)) ++ disjointCols.map(c => col(c))).toList

// Join and project    
val joined = upper.join(lower, upper.col("uid") === lower.col("uid"), "full_outer").select(columns:_*)
joined.show

Upvotes: 1

Related Questions