kingshark
kingshark

Reputation: 315

PySpark window function mark first row of each partition that meet specific condition

given this dataframe

+--------+------+----------+--------+
|app_id  |order |entry_flag|operator|
+--------+------+----------+--------+
|AP-1    |1     |1         |S       |
|AP-1    |2     |0         |A       |
|AP-2    |3     |0         |S       |
|AP-2    |4     |0         |A       |
|AP-2    |5     |1         |S       |
|AP-2    |6     |0         |S       |
|AP-2    |7     |0         |A       |
|AP-2    |8     |0         |A       |
|AP-2    |9     |1         |A       |
|AP-2    |10    |0         |S       |
+--------+------+----------+--------+

I want to add a new column flag_x, which is boolean type, logic is:

partitioned/grouped by app_id, order by order, when we meet the row with entry_flag = 1, move forward, find first row after it that has entry_flag = 0 and operator = A, mark flag_x = 1, otherwise flag_x = 0

For sample above, the output should be:

+--------+------+----------+--------+------+
|app_id  |order |entry_flag|operator|flag_x|
+--------+------+----------+--------+------+
|AP-1    |1     |1         |S       |0     |
|AP-1    |2     |0         |A       |1     |
|AP-2    |3     |0         |S       |0     |
|AP-2    |4     |0         |A       |0     |
|AP-2    |5     |1         |S       |0     |
|AP-2    |6     |0         |S       |0     |
|AP-2    |7     |0         |A       |1     |
|AP-2    |8     |0         |A       |0     |
|AP-2    |9     |1         |A       |0     |
|AP-2    |10    |0         |S       |0     |
+--------+------+----------+--------+------+

How can we achieve that using PySpark dataframe operations?

Upvotes: 1

Views: 1658

Answers (1)

Kafels
Kafels

Reputation: 4069

Your problem is little hard to do and to explain it I left comments into code:

from pyspark.sql import Row, Window
import pyspark.sql.functions as f


df = spark.createDataFrame([
  Row(app_id='AP-1', order=1, entry_flag=1, operator='S'),
  Row(app_id='AP-1', order=2, entry_flag=0, operator='A'),
  Row(app_id='AP-2', order=3, entry_flag=0, operator='S'),
  Row(app_id='AP-2', order=4, entry_flag=0, operator='A'),
  Row(app_id='AP-2', order=5, entry_flag=1, operator='S'),
  Row(app_id='AP-2', order=6, entry_flag=0, operator='S'),
  Row(app_id='AP-2', order=7, entry_flag=0, operator='A'),
  Row(app_id='AP-2', order=8, entry_flag=0, operator='A'),
  Row(app_id='AP-2', order=9, entry_flag=1, operator='A'),
  Row(app_id='AP-2', order=10, entry_flag=0, operator='S')
])

# Creating a column to group each entry where the value is 1
w_entry = Window.partitionBy('app_id').orderBy('order')
df = df.withColumn('group', f.sum('entry_flag').over(w_entry))

# Applying your boolean rule
df = df.withColumn('match', f.when(f.col('group') > f.lit(0), 
                                   (f.col('entry_flag') == f.lit(0)) & (f.col('operator')== f.lit('A')))
                             .otherwise(f.lit(False)))
# +------+-----+----------+--------+-----+-----+
# |app_id|order|entry_flag|operator|group|match|
# +------+-----+----------+--------+-----+-----+
# |AP-1  |1    |1         |S       |1    |false|
# |AP-1  |2    |0         |A       |1    |true |
# |AP-2  |3    |0         |S       |0    |false|
# |AP-2  |4    |0         |A       |0    |false|
# |AP-2  |5    |1         |S       |1    |false|
# |AP-2  |6    |0         |S       |1    |false|
# |AP-2  |7    |0         |A       |1    |true |
# |AP-2  |8    |0         |A       |1    |true |
# |AP-2  |9    |1         |A       |2    |false|
# |AP-2  |10   |0         |S       |2    |false|
# +------+-----+----------+--------+-----+-----+

# If a group has two or more matches like the example below
# |AP-2  |7    |0         |A       |1    |true |
# |AP-2  |8    |0         |A       |1    |true |
# identify which is the first occurrence and set `flag_x` with 1 to it.

w_flag = Window.partitionBy('app_id', 'group', 'match')
df = df.withColumn('flag_x', (f.col('match') & (f.col('order') == f.min('order').over(w_flag))).cast('int'))

# Drop temporary columns
df = df.drop('group', 'match')

df.show(truncate=False)
# +------+-----+----------+--------+------+
# |app_id|order|entry_flag|operator|flag_x|
# +------+-----+----------+--------+------+
# |AP-1  |1    |1         |S       |0     |
# |AP-1  |2    |0         |A       |1     |
# |AP-2  |3    |0         |S       |0     |
# |AP-2  |4    |0         |A       |0     |
# |AP-2  |5    |1         |S       |0     |
# |AP-2  |6    |0         |S       |0     |
# |AP-2  |7    |0         |A       |1     |
# |AP-2  |8    |0         |A       |0     |
# |AP-2  |9    |1         |A       |0     |
# |AP-2  |10   |0         |S       |0     |
# +------+-----+----------+--------+------+

Upvotes: 1

Related Questions