Reputation: 1779
How can I generate the expected value, ExpectedGroup such that the same value exists when True, but changes and increments by 1, when we run into a False statement in cond1
.
Consider:
df = spark.createDataFrame(sc.parallelize([
['A', '2019-01-01', 'P', 'O', 2, None],
['A', '2019-01-02', 'O', 'O', 5, 1],
['A', '2019-01-03', 'O', 'O', 10, 1],
['A', '2019-01-04', 'O', 'P', 4, None],
['A', '2019-01-05', 'P', 'P', 300, None],
['A', '2019-01-06', 'P', 'O', 2, None],
['A', '2019-01-07', 'O', 'O', 5, 2],
['A', '2019-01-08', 'O', 'O', 10, 2],
['A', '2019-01-09', 'O', 'P', 4, None],
['A', '2019-01-10', 'P', 'P', 300, None],
['B', '2019-01-01', 'P', 'O', 2, None],
['B', '2019-01-02', 'O', 'O', 5, 3],
['B', '2019-01-03', 'O', 'O', 10, 3],
['B', '2019-01-04', 'O', 'P', 4, None],
['B', '2019-01-05', 'P', 'P', 300, None],
]),
['ID', 'Time', 'FromState', 'ToState', 'Hours', 'ExpectedGroup'])
# condition statement
cond1 = (df.FromState == 'O') & (df.ToState == 'O')
df = df.withColumn('condition', cond1.cast("int"))
df = df.withColumn('conditionLead', F.lead('condition').over(Window.orderBy('ID', 'Time')))
df = df.na.fill(value=0, subset=["conditionLead"])
df = df.withColumn('finalCondition', ( (F.col('condition') == 1) & (F.col('conditionLead') == 1)).cast('int'))
# working pandas option:
# cond1 = ( (df.FromState == 'O') & (df.ToState == 'O') )
# df['ExpectedGroup'] = (cond1.shift(-1) & cond1).cumsum().mask(~cond1)
# other working option:
# cond1 = ( (df.FromState == 'O') & (df.ToState == 'O') )
# df['ExpectedGroup'] = (cond1.diff()&cond1).cumsum().where(cond1)
# failing here
windowval = (Window.partitionBy('ID').orderBy('Time').rowsBetween(Window.unboundedPreceding, 0))
df = df.withColumn('ExpectedGroup2', F.sum(F.when(cond1, F.col('finalCondition'))).over(windowval))
Upvotes: 0
Views: 123
Reputation: 781
Just use the same logic shown in your Pandas code, use Window lag function to get the previous value of cond1
, set the flag to 1
only when the current cond1
is true and the previous cond1
is false, and then do the cumsum based on cond1
, see below code(BTW, you probably want to add ID
to partitionBy clause of the WindSpec, in that case the last ExpectedGroup1
should be 1
instead of 3
):
from pyspark.sql import functions as F, Window
w = Window.partitionBy().orderBy('ID', 'time')
df_new = (df.withColumn('cond1', (F.col('FromState')=='O') & (F.col('ToState')=='O'))
.withColumn('f', F.when(F.col('cond1') & (~F.lag(F.col('cond1')).over(w)),1).otherwise(0))
.withColumn('ExpectedGroup1', F.when(F.col('cond1'), F.sum('f').over(w)))
)
df_new.show()
+---+----------+---------+-------+-----+-------------+-----+---+--------------+
| ID| Time|FromState|ToState|Hours|ExpectedGroup|cond1| f|ExpectedGroup1|
+---+----------+---------+-------+-----+-------------+-----+---+--------------+
| A|2019-01-01| P| O| 2| null|false| 0| null|
| A|2019-01-02| O| O| 5| 1| true| 1| 1|
| A|2019-01-03| O| O| 10| 1| true| 0| 1|
| A|2019-01-04| O| P| 4| null|false| 0| null|
| A|2019-01-05| P| P| 300| null|false| 0| null|
| A|2019-01-06| P| O| 2| null|false| 0| null|
| A|2019-01-07| O| O| 5| 2| true| 1| 2|
| A|2019-01-08| O| O| 10| 2| true| 0| 2|
| A|2019-01-09| O| P| 4| null|false| 0| null|
| A|2019-01-10| P| P| 300| null|false| 0| null|
| B|2019-01-01| P| O| 2| null|false| 0| null|
| B|2019-01-02| O| O| 5| 3| true| 1| 3|
| B|2019-01-03| O| O| 10| 3| true| 0| 3|
| B|2019-01-04| O| P| 4| null|false| 0| null|
| B|2019-01-05| P| P| 300| null|false| 0| null|
+---+----------+---------+-------+-----+-------------+-----+---+--------------+
Upvotes: 1
Reputation: 936
To create a group column counter in PySpark, we can use the Window function and the row_number function. The Window function allows us to define a partitioning and ordering criteria for the rows, and the row_number function returns the position of the row within the window.
For example, suppose we have a PySpark DataFrame called df with the following data:
id | name | category |
---|---|---|
1 | A | X |
2 | B | X |
3 | C | Y |
4 | D | Y |
5 | E | Z |
We want to create a new column called group_counter that assigns a number to each row within the same category, starting from 1. To do this, we can use the following code:
# Import the required modules
from pyspark.sql import Window
from pyspark.sql.functions import row_number
# Define the window specification
window = Window.partitionBy("category").orderBy("id")
# Create the group column counter
df = df.withColumn("group_counter", row_number().over(window))
# Show the result
df.show()
The output of the code is:
id | name | category | group_counter |
---|---|---|---|
1 | A | X | 1 |
2 | B | X | 2 |
3 | C | Y | 1 |
4 | D | Y | 2 |
5 | E | Z | 1 |
As we can see, the group_counter column increments by 1 for each row within the same category, and resets to 1 when the category changes.
Creating a group column counter can be useful for various purposes, such as:
Upvotes: 0