Antony
Antony

Reputation: 1108

Joining two dataframes in scala with a column which are not having exact values

I tried to merge two dataframes with respect to a column which are not having exact same values.

Below given is DF1

+--------+-----+------+
| NUM_ID | TIME|SG1_V |
+--------+-----+------+
|XXXXX01 |1001 |79.0  |
|XXXXX01 |1005 |88.0  |
|XXXXX01 |1010 |99.0  |
|XXXXX01 |1015 |null  |
|XXXXX01 |1020 |100.0 |
|XXXXX02 |1001 |81.0  |
|XXXXX02 |1010 |91.0  |
|XXXXX02 |1050 |93.0  |
|XXXXX02 |1060 |93.0  |
|XXXXX02 |1070 |93.0  |
+--------+-----+------+

Below is DF2

+---------+-----+------+
| NUM_ID  | TIME|SG2_V |
+---------+-----+------+
|XXXXX01  |1001 |  99.0|
|XXXXX01  |1003 |  22.0|
|XXXXX01  |1007 |  85.0|
|XXXXX01  |1011 |  1.0 |

|XXXXX02  |1001 |  22.0|
|XXXXX02  |1009 |  85.0|
|XXXXX02  |1048 |  1.0 |
|XXXXX02  |1052 |  99.0|
+---------+-----+------+

I have to join these two DF on columns NUM_ID, which should be exactly same and on column TIME which may/may not be exact value.

The TIME in DF2 may/may not contain exact value as in the DF1. If the value is not exact then, I have to join with the highest nearest value available (ie - column value in DF2 should be =< Exact Value in DF1).

It will be more clear after looking at the expected output shown below.

+--------+-----+------+-----+------+
| NUM_ID | TIME|SG1_V | TIME|SG2_V |
+--------+-----+------+-----+------+
|XXXXX01 |1001 |79.0  |1001 |  99.0|
|XXXXX01 |1005 |88.0  |1003 |  22.0|
|XXXXX01 |1010 |99.0  |1007 |  85.0|
|XXXXX01 |1015 |null  |1011 |  1.0 |
|XXXXX01 |1020 |100.0 |1011 |  1.0 |

|XXXXX02 |1001 |81.0  |1001 |  22.0|
|XXXXX02 |1010 |91.0  |1009 |  85.0|
|XXXXX02 |1050 |93.0  |1048 |  1.0 |
|XXXXX02 |1060 |93.0  |1052 |  99.0|
|XXXXX02 |1070 |93.0  |1052 |  99.0|
+--------+-----+------+-----+------+

For NUM_ID XXXXX01, TIME(1005) in DF1 is not available in DF2, so it took nearest value (1003) which is smaller than 1005.

How to join in such a way that if exact value is not available, then join with nearest value.

Appreciate any leads. Thanks in Advance.

Upvotes: 1

Views: 649

Answers (3)

Antony
Antony

Reputation: 1108

The above solution is joining the dataframes after saving it into hive table.

I tried to join two dataframes without saving into hive table by applying the same logic and it is as shown below.

val finalSignals = finalABC.as("df1").join(finalXYZ.as("df2"), $"df1.NUM_ID" === $"df2.NUM_ID" && $"df2.TIME"  <= $"df1.TIME", "left").withColumn("rno", row_number.over(Window.partitionBy($"df1.NUM_ID", $"df1.TIME").orderBy($"df1.TIME" - $"df2.TIME"))).select(col("df1.NUM_ID").as("NUM_ID"),col("df1.TIME"),col("df2.NUM_ID").as("NUM_ID2"),col("df1.TIME").as("TIME2"),
col("rno")).filter("rno == 1")

Is this equivalent to the above provided solution

spark.sql("""
     |   SELECT * FROM (
     |     SELECT *,
     |       ROW_NUMBER() OVER (PARTITION BY df1.NUM_ID, df1.TIME ORDER BY (df1.TIME - df2.TIME)) rno
     |     FROM df1 JOIN df2 
     |     ON df2.NUM_ID = df1.NUM_ID AND 
     |        df2.TIME  <= df1.TIME
     |   ) T
     | WHERE T.rno = 1
     |""")

Upvotes: 0

mazaneicha
mazaneicha

Reputation: 9427

Easy way to do it is by using one of Spark's Window functions, row_number() or rank():

scala> spark.sql("""
     |   SELECT * FROM (
     |     SELECT *,
     |       ROW_NUMBER() OVER (PARTITION BY df1.NUM_ID, df1.TIME ORDER BY (df1.TIME - df2.TIME)) rno
     |     FROM df1 JOIN df2 
     |     ON df2.NUM_ID = df1.NUM_ID AND 
     |        df2.TIME  <= df1.TIME
     |   ) T
     | WHERE T.rno = 1
     |""").show()
+-------+----+-----+-------+----+-----+---+
| NUM_ID|TIME|SG1_V| NUM_ID|TIME|SG2_V|rno|
+-------+----+-----+-------+----+-----+---+
|XXXXX01|1001| 79.0|XXXXX01|1001| 99.0|  1|
|XXXXX01|1005| 88.0|XXXXX01|1003| 22.0|  1|
|XXXXX01|1010| 99.0|XXXXX01|1007| 85.0|  1|
|XXXXX01|1015| null|XXXXX01|1011|  1.0|  1|
|XXXXX01|1020|100.0|XXXXX01|1011|  1.0|  1|
|XXXXX02|1001| 81.0|XXXXX02|1001| 22.0|  1|
|XXXXX02|1010| 91.0|XXXXX02|1009| 85.0|  1|
+-------+----+-----+-------+----+-----+---+

scala>

Upvotes: 1

Emiliano Martinez
Emiliano Martinez

Reputation: 4133

If you need to join by two field using and specific interval for one of them you can do something like:

  import org.apache.spark.sql.functions.when

  val spark = SparkSession.builder().master("local[1]").getOrCreate()

  val df1 : DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("XXXXX01",1001,79.0),
    Row("XXXXX01",1005,88.0),
    Row("XXXXX01",1010,99.0),
    Row("XXXXX01",1015, null),
    Row("XXXXX01",1020,100.0),
    Row("XXXXX02",1001,81.0))),
    StructType(Seq(StructField("NUM_ID", StringType, false), StructField("TIME", IntegerType, false), StructField("SG1_V", DoubleType, true))))

  val df2 : DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("XXXXX01",1001,79.0),
    Row("XXXXX01",1001, 99.0),
    Row("XXXXX01",1003, 22.0),
    Row("XXXXX01",1007, 85.1),
    Row("XXXXX01",1011, 1.0),
    Row("XXXXX02",1001,22.0))),
    StructType(Seq(StructField("NUM_ID", StringType, false), StructField("TIME", IntegerType, false), StructField("SG1_V", DoubleType, false))))

  val interval : Int = 10

  def main(args: Array[String]) : Unit = {
    df1.join(df2, ((df1("TIME")) - df2("TIME") > lit(interval)) && df1("NUM_ID") === df2("NUM_ID")).show()
  } 

It gives as result:

+-------+----+-----+-------+----+-----+
| NUM_ID|TIME|SG1_V| NUM_ID|TIME|SG1_V|
+-------+----+-----+-------+----+-----+
|XXXXX01|1015| null|XXXXX01|1001| 79.0|
|XXXXX01|1015| null|XXXXX01|1001| 99.0|
|XXXXX01|1015| null|XXXXX01|1003| 22.0|
|XXXXX01|1020|100.0|XXXXX01|1001| 79.0|
|XXXXX01|1020|100.0|XXXXX01|1001| 99.0|
|XXXXX01|1020|100.0|XXXXX01|1003| 22.0|
|XXXXX01|1020|100.0|XXXXX01|1007| 85.1|
+-------+----+-----+-------+----+-----+

Upvotes: 2

Related Questions