vanja_65
vanja_65

Reputation: 101

Filter rows based on a time stamp in another column Spark Scala

Suppose I have the following data frame in Spark Scala:

 +--------+--------------------+--------------------+
 |Index   |                Date|              Date_x|
 +--------+--------------------+--------------------+
 |       1|2018-01-31T20:33:...|2018-01-31T21:18:...|
 |       1|2018-01-31T20:35:...|2018-01-31T21:18:...|
 |       1|2018-01-31T21:04:...|2018-01-31T21:18:...|
 |       1|2018-01-31T21:05:...|2018-01-31T21:18:...|
 |       1|2018-01-31T21:15:...|2018-01-31T21:18:...|
 |       1|2018-01-31T21:16:...|2018-01-31T21:18:...|
 |       1|2018-01-31T21:19:...|2018-01-31T21:18:...|
 |       1|2018-01-31T21:20:...|2018-01-31T21:18:...|
 |       2|2018-01-31T19:43:...|2018-01-31T20:35:...|
 |       2|2018-01-31T19:44:...|2018-01-31T20:35:...|
 |       2|2018-01-31T20:36:...|2018-01-31T20:35:...|
 +--------+--------------------+--------------------+

I want to remove the rows where Date < Date_x for each Index, as illustrated below:

 +--------+--------------------+--------------------+
 |Index   |                Date|              Date_x|
 +--------+--------------------+--------------------+
 |       1|2018-01-31T21:19:...|2018-01-31T21:18:...|
 |       1|2018-01-31T21:20:...|2018-01-31T21:18:...|
 |       2|2018-01-31T20:36:...|2018-01-31T20:35:...|
 +--------+--------------------+--------------------+

I tried adding a column x_idx by using monotonically_increasing_id() and getting min(x_idx) for each Index where Date < Date_x. So that I can subsequently drop the rows from a data frame that don't satisfy the condition. But it doesn't seem to work for me. I probably miss the understanding of how agg() works. Thank you for your help!

  val test_df = df.withColumn("x_idx", monotonically_increasing_id())
  val newIdx = test_df
           .filter($"Date" > "Date_x")
           .groupBy($"Index")
           .agg(min($"x_idx"))
           .toDF("n_Index", "min_x_idx")

      newIdx.show

      +-------+--------+
      |n_Index|min_x_idx|
      +-------+--------+
      +-------+--------+

Upvotes: 0

Views: 446

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

You forgot to add $ in

.filter($"Date" > "Date_x")

so the correct filter is

.filter($"Date" > $"Date_x")

You can use alias instead of calling toDF as

val newIdx = test_df
  .filter($"Date" > $"Date_x")
  .groupBy($"Index".as("n_Index"))
  .agg(min($"x_idx").as("min_x_idx"))

You should be getting output as

+-------+---------+
|n_Index|min_x_idx|
+-------+---------+
|1      |6        |
|2      |10       |
+-------+---------+

Upvotes: 1

Ravikumar
Ravikumar

Reputation: 1131

The filter condition might filtering all the records. Please check that print the dataframe after filtering records and make sure your filter works as you expected.

 val newIdx = test_df
           .filter($"Date" > $"Date_x")
           .show

Upvotes: 0

Related Questions