Reputation: 1945
I would like to apply a groupBy
and a subsequent agg
function to a PySpark DataFrame, but only to a specific window. This is best illustrated by an example. Suppose that I have a dataset named df
:
df.show()
+-----+----------+----------+-------+
| ID| Timestamp| Condition| Value|
+-----+----------+----------+-------+
| z1| 1| 0| 50|
|-------------------------------------------|
| | z1| 2| 0| 51| |
| | z1| 3| 0| 52| |
| | z1| 4| 0| 51| |
| | z1| 5| 1| 51| |
| | z1| 6| 0| 49| |
| | z1| 7| 0| 44| |
| | z1| 8| 0| 46| |
|-------------------------------------------|
| z1| 9| 0| 48|
| z1| 10| 0| 42|
+-----+----------+----------+-------+
Particularly, what I would like to do is to apply a kind of window of +- 3 rows to the row where column Condition == 1
(i.e. in this case, row 5). Within that window, as depicted in the above DataFrame, I would like to find the minimum value of column Value
and the corresponding value of column Timestamp
, thus obtaining:
+----------+----------+
| Min_value| Timestamp|
+----------+----------+
| 44| 7|
+----------+----------+
Does anyone know how this can be tackled?
Many thanks in advance
Marioanzas
Upvotes: 2
Views: 912
Reputation: 42352
You can use a window that spans between 3 preceding and 3 following rows, get the minimum, and filter the condition:
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'min',
F.min(
F.struct('Value', 'Timestamp')
).over(Window.partitionBy('ID').orderBy('Timestamp').rowsBetween(-3,3))
).filter('Condition = 1').select('min.*')
df2.show()
+-----+---------+
|Value|Timestamp|
+-----+---------+
| 44| 7|
+-----+---------+
Upvotes: 1