Reputation: 73
I have two dataframes df1 and df2. Both have a column 'date' as shown below.
Structure of df1
+----------+
| date|
+----------+
|02-01-2015|
|02-02-2015|
|02-03-2015|
+----------+
Structure of df2
+---+-------+-----+----------+
| ID|feature|value| date|
+---+-------+-----+----------+
| 1|balance| 100|01-01-2015|
| 1|balance| 100|05-01-2015|
| 1|balance| 100|30-01-2015|
| 1|balance| 100|01-02-2015|
| 1|balance| 100|01-03-2015|
+---+-------+-----+----------+
I have to take each row in 'date' column from df1, compare with df2 'date' and get all rows from df2 that are less than the date in df1.
Say take first row 02-01-2015 from df1 and get all rows that are less than 02-01-2015 from df2 which should produce an output as follows
+---+-------+-----+----------+
| ID|feature|value| date|
+---+-------+-----+----------+
| 1|balance| 100|01-01-2015|
+---+-------+-----+----------+
What is the best way to achieve this in spark-scala ? I have hundreds of millions of rows. I thought of using window function in spark but window is limitied to one dataframe.
Upvotes: 1
Views: 41476
Reputation: 27373
this gets you all results in a new dataframe:
val df1 = Seq(
"02-01-2015",
"02-02-2015",
"02-03-2015"
).toDF("date")
.withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))
val df2 = Seq(
(1, "balance", 100, "01-01-2015"),
(1, "balance", 100, "05-01-2015"),
(1, "balance", 100, "30-01-2015"),
(1, "balance", 100, "01-02-2015"),
(1, "balance", 100, "01-03-2015")
).toDF("ID", "feature", "value", "date")
.withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))
df1.join(
df2, df2("date") < df1("date"), "left"
).show()
+-------------------+---+-------+-----+-------------------+
| date| ID|feature|value| date|
+-------------------+---+-------+-----+-------------------+
|2015-01-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-01-05 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-01-30 00:00:00|
|2015-02-02 00:00:00| 1|balance| 100|2015-02-01 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-01-05 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-01-30 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-02-01 00:00:00|
|2015-03-02 00:00:00| 1|balance| 100|2015-03-01 00:00:00|
+-------------------+---+-------+-----+-------------------+
EDIT: to get the number of matchign records from df2, do :
df1.join(
df2, df2("date") < df1("date"), "left"
)
.groupBy(df1("date"))
.count
.orderBy(df1("date"))
.show
+-------------------+-----+
| date|count|
+-------------------+-----+
|2015-01-02 00:00:00| 1|
|2015-02-02 00:00:00| 4|
|2015-03-02 00:00:00| 5|
+-------------------+-----+
Upvotes: 3
Reputation: 41957
If you are looking to compare only one row of df1
with df2
date
then you should first select
the intended row from df1
val oneRowDF1 = df1.select($"date".as("date2")).where($"date" === "02-01-2015")
then you should join
with the logic you have as
df2.join(oneRowDF1, unix_timestamp(df2("date"), "dd-MM-yyyy") < unix_timestamp(oneRowDF1("date2"), "dd-MM-yyyy"))
.drop("date2")
which should give you
+---+-------+-----+----------+
|ID |feature|value|date |
+---+-------+-----+----------+
|1 |balance|100 |01-01-2015|
+---+-------+-----+----------+
Updated
Joins are expensive as it requires shuffling of data between executors of different nodes.
You can simply use filter function as below
val oneRowDF1 = df1.select(unix_timestamp($"date", "dd-MM-yyyy").as("date2")).where($"date" === "02-01-2015")
df2.filter(unix_timestamp($"date", "dd-MM-yyyy") < oneRowDF1.take(1)(0)(0))
I hope the answer is helpful
Upvotes: 0