Reputation: 315
given this dataframe
+--------+------+----------+--------+
|app_id |order |entry_flag|operator|
+--------+------+----------+--------+
|AP-1 |1 |1 |S |
|AP-1 |2 |0 |A |
|AP-2 |3 |0 |S |
|AP-2 |4 |0 |A |
|AP-2 |5 |1 |S |
|AP-2 |6 |0 |S |
|AP-2 |7 |0 |A |
|AP-2 |8 |0 |A |
|AP-2 |9 |1 |A |
|AP-2 |10 |0 |S |
+--------+------+----------+--------+
I want to add a new column flag_x
, which is boolean type, logic is:
partitioned/grouped by
app_id
, order byorder
, when we meet the row withentry_flag = 1
, move forward, find first row after it that hasentry_flag = 0 and operator = A
, markflag_x = 1
, otherwiseflag_x = 0
For sample above, the output should be:
+--------+------+----------+--------+------+
|app_id |order |entry_flag|operator|flag_x|
+--------+------+----------+--------+------+
|AP-1 |1 |1 |S |0 |
|AP-1 |2 |0 |A |1 |
|AP-2 |3 |0 |S |0 |
|AP-2 |4 |0 |A |0 |
|AP-2 |5 |1 |S |0 |
|AP-2 |6 |0 |S |0 |
|AP-2 |7 |0 |A |1 |
|AP-2 |8 |0 |A |0 |
|AP-2 |9 |1 |A |0 |
|AP-2 |10 |0 |S |0 |
+--------+------+----------+--------+------+
How can we achieve that using PySpark dataframe operations?
Upvotes: 1
Views: 1658
Reputation: 4069
Your problem is little hard to do and to explain it I left comments into code:
from pyspark.sql import Row, Window
import pyspark.sql.functions as f
df = spark.createDataFrame([
Row(app_id='AP-1', order=1, entry_flag=1, operator='S'),
Row(app_id='AP-1', order=2, entry_flag=0, operator='A'),
Row(app_id='AP-2', order=3, entry_flag=0, operator='S'),
Row(app_id='AP-2', order=4, entry_flag=0, operator='A'),
Row(app_id='AP-2', order=5, entry_flag=1, operator='S'),
Row(app_id='AP-2', order=6, entry_flag=0, operator='S'),
Row(app_id='AP-2', order=7, entry_flag=0, operator='A'),
Row(app_id='AP-2', order=8, entry_flag=0, operator='A'),
Row(app_id='AP-2', order=9, entry_flag=1, operator='A'),
Row(app_id='AP-2', order=10, entry_flag=0, operator='S')
])
# Creating a column to group each entry where the value is 1
w_entry = Window.partitionBy('app_id').orderBy('order')
df = df.withColumn('group', f.sum('entry_flag').over(w_entry))
# Applying your boolean rule
df = df.withColumn('match', f.when(f.col('group') > f.lit(0),
(f.col('entry_flag') == f.lit(0)) & (f.col('operator')== f.lit('A')))
.otherwise(f.lit(False)))
# +------+-----+----------+--------+-----+-----+
# |app_id|order|entry_flag|operator|group|match|
# +------+-----+----------+--------+-----+-----+
# |AP-1 |1 |1 |S |1 |false|
# |AP-1 |2 |0 |A |1 |true |
# |AP-2 |3 |0 |S |0 |false|
# |AP-2 |4 |0 |A |0 |false|
# |AP-2 |5 |1 |S |1 |false|
# |AP-2 |6 |0 |S |1 |false|
# |AP-2 |7 |0 |A |1 |true |
# |AP-2 |8 |0 |A |1 |true |
# |AP-2 |9 |1 |A |2 |false|
# |AP-2 |10 |0 |S |2 |false|
# +------+-----+----------+--------+-----+-----+
# If a group has two or more matches like the example below
# |AP-2 |7 |0 |A |1 |true |
# |AP-2 |8 |0 |A |1 |true |
# identify which is the first occurrence and set `flag_x` with 1 to it.
w_flag = Window.partitionBy('app_id', 'group', 'match')
df = df.withColumn('flag_x', (f.col('match') & (f.col('order') == f.min('order').over(w_flag))).cast('int'))
# Drop temporary columns
df = df.drop('group', 'match')
df.show(truncate=False)
# +------+-----+----------+--------+------+
# |app_id|order|entry_flag|operator|flag_x|
# +------+-----+----------+--------+------+
# |AP-1 |1 |1 |S |0 |
# |AP-1 |2 |0 |A |1 |
# |AP-2 |3 |0 |S |0 |
# |AP-2 |4 |0 |A |0 |
# |AP-2 |5 |1 |S |0 |
# |AP-2 |6 |0 |S |0 |
# |AP-2 |7 |0 |A |1 |
# |AP-2 |8 |0 |A |0 |
# |AP-2 |9 |1 |A |0 |
# |AP-2 |10 |0 |S |0 |
# +------+-----+----------+--------+------+
Upvotes: 1