Aesir
Aesir

Reputation: 2473

How to set a dynamic where clause using pyspark

I have a dataset within which there are multiple groups. I have a rank column which incrementally counts counts each entry per group. An example of this structure is shown below:

+-----------+---------+---------+
|  equipment|   run_id|run_order|
+-----------+---------+---------+
|1          |430032589|        1|
|1          |430332632|        2|
|1          |430563033|        3|
|1          |430785715|        4|
|1          |431368577|        5|
|1          |431672148|        6|
|2          |435497596|        1|
|1          |435522469|        7|

Each group (equipment) has a different amount of runs. Shown above equipment 1 has 7 runs whilst equipment 2 has 1 run. I would like to select the first and last n runs per equipment. To select the first n runs is straightforward:

df.select("equipment", "run_id").distinct().where(df.run_order <= n).orderBy("equipment").show()

The distinct is in the query because each row is equivalent to a timestep and therefore each row will log sensor readings associated with that timestep. Therefore there will be many rows with the same equipment, run_id and run_order, which should be preserved in the end result and not aggregated.

As the number of runs is unique to each equipment I can't do an equivalent select query with a where clause (I think) to get the last n runs:

df.select("equipment", "run_id").distinct().where(df.rank >= total_runs - n).orderBy("equipment").show()

I can run a groupBy to get the highest run_order for each equipment

+-----------+----------------+
|  equipment| max(run_order) |
+-----------+----------------+
|1          |               7|
|2          |               1|

But I am unsure if there is a way I can construct a dynamic where clause that works like this. So that I get the last n runs (including all timestep data for each run).

Upvotes: 0

Views: 281

Answers (1)

mck
mck

Reputation: 42392

You can add a column of the max rank for each equipment and do a filter based on that column:

from pyspark.sql import functions as F, Window

n = 3

df2 = df.withColumn(
    'max_run', 
    F.max('run_order').over(Window.partitionBy('equipment'))
).where(F.col('run_order') >= F.col('max_run') - n)

Upvotes: 1

Related Questions