Kirupa
Kirupa

Reputation: 73

How to count number of rows in a spark dataframe based on a value (primary key) from another dataframe?

I have two dataframes df1 and df2. Both have a column 'date' as shown below.

Structure of df1

+----------+
|      date|
+----------+
|02-01-2015|
|02-02-2015|
|02-03-2015|
+----------+

Structure of df2

+---+-------+-----+----------+
| ID|feature|value|      date|
+---+-------+-----+----------+
|  1|balance|  100|01-01-2015|
|  1|balance|  100|05-01-2015|
|  1|balance|  100|30-01-2015|
|  1|balance|  100|01-02-2015|
|  1|balance|  100|01-03-2015|
+---+-------+-----+----------+

I have to take each row in 'date' column from df1, compare with df2 'date' and get all rows from df2 that are less than the date in df1.

Say take first row 02-01-2015 from df1 and get all rows that are less than 02-01-2015 from df2 which should produce an output as follows

+---+-------+-----+----------+
| ID|feature|value|      date|
+---+-------+-----+----------+
|  1|balance|  100|01-01-2015|
+---+-------+-----+----------+ 

What is the best way to achieve this in spark-scala ? I have hundreds of millions of rows. I thought of using window function in spark but window is limitied to one dataframe.

Upvotes: 1

Views: 41476

Answers (2)

Raphael Roth
Raphael Roth

Reputation: 27373

this gets you all results in a new dataframe:

val df1 = Seq(
  "02-01-2015",
  "02-02-2015",
  "02-03-2015"
).toDF("date")
  .withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))

val df2 = Seq(
  (1, "balance", 100, "01-01-2015"),
  (1, "balance", 100, "05-01-2015"),
  (1, "balance", 100, "30-01-2015"),
  (1, "balance", 100, "01-02-2015"),
  (1, "balance", 100, "01-03-2015")
).toDF("ID", "feature", "value", "date")
  .withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy")))

df1.join(
  df2, df2("date") < df1("date"), "left"
).show()


+-------------------+---+-------+-----+-------------------+
|               date| ID|feature|value|               date|
+-------------------+---+-------+-----+-------------------+
|2015-01-02 00:00:00|  1|balance|  100|2015-01-01 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-01-01 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-01-05 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-01-30 00:00:00|
|2015-02-02 00:00:00|  1|balance|  100|2015-02-01 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-01-01 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-01-05 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-01-30 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-02-01 00:00:00|
|2015-03-02 00:00:00|  1|balance|  100|2015-03-01 00:00:00|
+-------------------+---+-------+-----+-------------------+

EDIT: to get the number of matchign records from df2, do :

 df1.join(
    df2, df2("date") < df1("date"), "left"
 )
 .groupBy(df1("date"))
 .count
 .orderBy(df1("date"))
 .show

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2015-01-02 00:00:00|    1|
|2015-02-02 00:00:00|    4|
|2015-03-02 00:00:00|    5|
+-------------------+-----+

Upvotes: 3

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If you are looking to compare only one row of df1 with df2 date then you should first select the intended row from df1

val oneRowDF1 = df1.select($"date".as("date2")).where($"date" === "02-01-2015")

then you should join with the logic you have as

df2.join(oneRowDF1, unix_timestamp(df2("date"), "dd-MM-yyyy") < unix_timestamp(oneRowDF1("date2"), "dd-MM-yyyy"))
    .drop("date2")

which should give you

+---+-------+-----+----------+
|ID |feature|value|date      |
+---+-------+-----+----------+
|1  |balance|100  |01-01-2015|
+---+-------+-----+----------+

Updated

Joins are expensive as it requires shuffling of data between executors of different nodes.

You can simply use filter function as below

val oneRowDF1 = df1.select(unix_timestamp($"date", "dd-MM-yyyy").as("date2")).where($"date" === "02-01-2015")

df2.filter(unix_timestamp($"date", "dd-MM-yyyy") < oneRowDF1.take(1)(0)(0))

I hope the answer is helpful

Upvotes: 0

Related Questions