Reputation: 333
I have 3 dataframes on spark : dataframe1 , dataframe2 and dataframe3 .
I want to join dataframe1 with an other dataframe based on a condition.
I use the following code :
Dataset <Row> df= dataframe1.filter(when(col("diffDate").lt(3888),dataframe1.join(dataframe2,
dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe2.col("tracking_time").desc())).
otherwise(dataframe1.join(dataframe3,
dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe3.col("tracking_time").desc())));
But I get this exception
Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Dataset
EDIT
Input dataframes :
dataframe1
+-----------+-------------+-------------+-------------+
| diffDate |id_device |id_vehicule |tracking_time|
+-----------+-------------+-------------+-------------+
|222 |1 |5 |2020-05-30 |
|4700 |8 |9 |2019-03-01 |
+-----------+-------------+-------------+-------------+
dataframe2
+-----------+-------------+-------------+-------------+
|id_device |id_vehicule |tracking_time|longitude |
+-----------+-------------+-------------+-------------+
|1 |5 |2020-05-12 | 33.21111 |
|8 |9 |2019-03-01 |20.2222 |
+-----------+-------------+-------------+-------------+
dataframe3
+-----------+-------------+-------------+-------------+
|id_device |id_vehicule |tracking_time|latitude |
+-----------+-------------+-------------+-------------+
|1 |5 |2020-05-12 | 40.333 |
|8 |9 |2019-02-28 |2.00000 |
+-----------+-------------+-------------+-------------+
when diffDate < 3888
+-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate |id_device |id_vehicule |tracking_time|id_device |id_vehicule |tracking_time|longitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|222 |1 |5 |2020-05-30 | 1 |5 |2020-05-12 | 33.21111 |
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+
when diffDate > 3888
+-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate |id_device |id_vehicule |tracking_time|id_device |id_vehicule |tracking_time|latitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|4700 |9 |5 |2019-03-01 | 8 |9 |2019-02-28 | 2.00000 |
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+
I need your help
Thank you.
Upvotes: 1
Views: 2164
Reputation: 6338
I think you need to revisit your code.
You are trying to execute a join for each row of the dataframe1
(of course based on the condition), which is I think incorrect requirement or misunderstood requirement.
when(condition, then).otherwise()
function executes for each row of the underlying dataframe and generally used to process the column based on condition. then
and else/otherwise
clause in the function only supports literals
which are existing columns in the dataframe primitive/ complex types and literals. you can't put dataframe or any operation outputting the dataframe there
May be your requirement is to join the datafrmae1
with datafrmae2
for the rows where col("diffDate").lt(3888)
. TO achieve this you can do the following -
dataframe1.join(dataframe2,
dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
and(dataframe1.col("diffDate").lt(3888))
)
.orderBy(dataframe2.col("tracking_time").desc())
dataframe1.as("a").join(dataframe2.as("b"),
dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
and(dataframe1.col("diffDate").lt(3888))
).selectExpr("a.*", "b.longitude", "null as latitude")
.unionByName(
dataframe1.as("a").join(dataframe3.as("c"),
dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time"))).
and(dataframe1.col("diffDate").geq(3888))
).selectExpr("a.*", "c.latitude", "null as longitude")
)
Upvotes: 1