Reputation: 2473
I have a dataset within which there are multiple groups. I have a rank column which incrementally counts counts each entry per group. An example of this structure is shown below:
+-----------+---------+---------+
| equipment| run_id|run_order|
+-----------+---------+---------+
|1 |430032589| 1|
|1 |430332632| 2|
|1 |430563033| 3|
|1 |430785715| 4|
|1 |431368577| 5|
|1 |431672148| 6|
|2 |435497596| 1|
|1 |435522469| 7|
Each group (equipment) has a different amount of runs. Shown above equipment 1 has 7 runs whilst equipment 2 has 1 run. I would like to select the first and last n runs per equipment. To select the first n runs is straightforward:
df.select("equipment", "run_id").distinct().where(df.run_order <= n).orderBy("equipment").show()
The distinct is in the query because each row is equivalent to a timestep and therefore each row will log sensor readings associated with that timestep. Therefore there will be many rows with the same equipment, run_id and run_order, which should be preserved in the end result and not aggregated.
As the number of runs is unique to each equipment I can't do an equivalent select query with a where clause (I think) to get the last n runs:
df.select("equipment", "run_id").distinct().where(df.rank >= total_runs - n).orderBy("equipment").show()
I can run a groupBy to get the highest run_order for each equipment
+-----------+----------------+
| equipment| max(run_order) |
+-----------+----------------+
|1 | 7|
|2 | 1|
But I am unsure if there is a way I can construct a dynamic where clause that works like this. So that I get the last n runs (including all timestep data for each run).
Upvotes: 0
Views: 281
Reputation: 42392
You can add a column of the max rank for each equipment and do a filter based on that column:
from pyspark.sql import functions as F, Window
n = 3
df2 = df.withColumn(
'max_run',
F.max('run_order').over(Window.partitionBy('equipment'))
).where(F.col('run_order') >= F.col('max_run') - n)
Upvotes: 1