HBoulmi
HBoulmi

Reputation: 333

Join two dataframe based on a condition using Spark / Java

I have 3 dataframes on spark : dataframe1 , dataframe2 and dataframe3 .

I want to join dataframe1 with an other dataframe based on a condition.

I use the following code :

Dataset <Row> df= dataframe1.filter(when(col("diffDate").lt(3888),dataframe1.join(dataframe2,
            dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
            and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
            and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe2.col("tracking_time").desc())).
                   otherwise(dataframe1.join(dataframe3,
                   dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
                           and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                           and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe3.col("tracking_time").desc())));

But I get this exception

Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Dataset

EDIT

Input dataframes :

dataframe1

+-----------+-------------+-------------+-------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|
+-----------+-------------+-------------+-------------+
|222        |1            |5            |2020-05-30   |          
|4700       |8            |9            |2019-03-01   |
+-----------+-------------+-------------+-------------+

dataframe2

+-----------+-------------+-------------+-------------+
|id_device  |id_vehicule  |tracking_time|longitude    |
+-----------+-------------+-------------+-------------+
|1          |5            |2020-05-12   | 33.21111    |       
|8          |9            |2019-03-01   |20.2222      |
+-----------+-------------+-------------+-------------+

dataframe3

+-----------+-------------+-------------+-------------+
|id_device  |id_vehicule  |tracking_time|latitude     |
+-----------+-------------+-------------+-------------+
|1          |5            |2020-05-12   | 40.333      |       
|8          |9            |2019-02-28   |2.00000      |
+-----------+-------------+-------------+-------------+

when diffDate < 3888

+-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|id_device  |id_vehicule  |tracking_time|longitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|222        |1            |5            |2020-05-30   | 1          |5            |2020-05-12   | 33.21111    |       
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+         

when diffDate > 3888

 +-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|id_device  |id_vehicule  |tracking_time|latitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|4700        |9            |5            |2019-03-01   | 8          |9            |2019-02-28   | 2.00000    |       
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+         

I need your help

Thank you.

Upvotes: 1

Views: 2164

Answers (1)

Som
Som

Reputation: 6338

I think you need to revisit your code.

You are trying to execute a join for each row of the dataframe1 (of course based on the condition), which is I think incorrect requirement or misunderstood requirement.

when(condition, then).otherwise() function executes for each row of the underlying dataframe and generally used to process the column based on condition. then and else/otherwise clause in the function only supports literals which are existing columns in the dataframe primitive/ complex types and literals. you can't put dataframe or any operation outputting the dataframe there

May be your requirement is to join the datafrmae1 with datafrmae2 for the rows where col("diffDate").lt(3888). TO achieve this you can do the following -

dataframe1.join(dataframe2,
                dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
                        and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                        and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                        and(dataframe1.col("diffDate").lt(3888))
                )
                        .orderBy(dataframe2.col("tracking_time").desc())

Edit-1


        dataframe1.as("a").join(dataframe2.as("b"),
                dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
                        and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                        and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                        and(dataframe1.col("diffDate").lt(3888))
        ).selectExpr("a.*", "b.longitude", "null as latitude")
                .unionByName(
                        dataframe1.as("a").join(dataframe3.as("c"),
                                dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
                                        and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                                        and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                                        and(dataframe1.col("diffDate").geq(3888))
                        ).selectExpr("a.*", "c.latitude", "null as longitude")
                               
                )

Upvotes: 1

Related Questions