Reputation: 101
Suppose I have the following data frame in Spark Scala:
+--------+--------------------+--------------------+
|Index | Date| Date_x|
+--------+--------------------+--------------------+
| 1|2018-01-31T20:33:...|2018-01-31T21:18:...|
| 1|2018-01-31T20:35:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:04:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:05:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:15:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:16:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:19:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:20:...|2018-01-31T21:18:...|
| 2|2018-01-31T19:43:...|2018-01-31T20:35:...|
| 2|2018-01-31T19:44:...|2018-01-31T20:35:...|
| 2|2018-01-31T20:36:...|2018-01-31T20:35:...|
+--------+--------------------+--------------------+
I want to remove the rows where Date < Date_x
for each Index, as illustrated below:
+--------+--------------------+--------------------+
|Index | Date| Date_x|
+--------+--------------------+--------------------+
| 1|2018-01-31T21:19:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:20:...|2018-01-31T21:18:...|
| 2|2018-01-31T20:36:...|2018-01-31T20:35:...|
+--------+--------------------+--------------------+
I tried adding a column x_idx
by using monotonically_increasing_id()
and getting min(x_idx)
for each Index
where Date < Date_x
. So that I can subsequently drop the rows from a data frame that don't satisfy the condition. But it doesn't seem to work for me. I probably miss the understanding of how agg()
works. Thank you for your help!
val test_df = df.withColumn("x_idx", monotonically_increasing_id())
val newIdx = test_df
.filter($"Date" > "Date_x")
.groupBy($"Index")
.agg(min($"x_idx"))
.toDF("n_Index", "min_x_idx")
newIdx.show
+-------+--------+
|n_Index|min_x_idx|
+-------+--------+
+-------+--------+
Upvotes: 0
Views: 446
Reputation: 41987
You forgot to add $
in
.filter($"Date" > "Date_x")
so the correct filter
is
.filter($"Date" > $"Date_x")
You can use alias
instead of calling toDF
as
val newIdx = test_df
.filter($"Date" > $"Date_x")
.groupBy($"Index".as("n_Index"))
.agg(min($"x_idx").as("min_x_idx"))
You should be getting output as
+-------+---------+
|n_Index|min_x_idx|
+-------+---------+
|1 |6 |
|2 |10 |
+-------+---------+
Upvotes: 1
Reputation: 1131
The filter condition might filtering all the records. Please check that print the dataframe after filtering records and make sure your filter works as you expected.
val newIdx = test_df
.filter($"Date" > $"Date_x")
.show
Upvotes: 0