John Stud
John Stud

Reputation: 1779

PySpark: Create incrementing group column counter

How can I generate the expected value, ExpectedGroup such that the same value exists when True, but changes and increments by 1, when we run into a False statement in cond1.

Consider:

df = spark.createDataFrame(sc.parallelize([
            ['A', '2019-01-01', 'P', 'O', 2, None],
            ['A', '2019-01-02', 'O', 'O', 5, 1],
            ['A', '2019-01-03', 'O', 'O', 10, 1],
            ['A', '2019-01-04', 'O', 'P', 4, None],
            ['A', '2019-01-05', 'P', 'P', 300, None],
            ['A', '2019-01-06', 'P', 'O', 2, None],
            ['A', '2019-01-07', 'O', 'O', 5, 2],
            ['A', '2019-01-08', 'O', 'O', 10, 2],
            ['A', '2019-01-09', 'O', 'P', 4, None],
            ['A', '2019-01-10', 'P', 'P', 300, None],
            ['B', '2019-01-01', 'P', 'O', 2, None],
            ['B', '2019-01-02', 'O', 'O', 5, 3],
            ['B', '2019-01-03', 'O', 'O', 10, 3],
            ['B', '2019-01-04', 'O', 'P', 4, None],
            ['B', '2019-01-05', 'P', 'P', 300, None],
            ]),
                           ['ID', 'Time', 'FromState', 'ToState', 'Hours', 'ExpectedGroup'])

# condition statement
cond1 = (df.FromState == 'O') & (df.ToState == 'O')
df = df.withColumn('condition', cond1.cast("int"))
df = df.withColumn('conditionLead', F.lead('condition').over(Window.orderBy('ID', 'Time')))
df = df.na.fill(value=0, subset=["conditionLead"])
df = df.withColumn('finalCondition', ( (F.col('condition') == 1) &  (F.col('conditionLead') == 1)).cast('int'))
# working pandas option:
# cond1 = ( (df.FromState == 'O') & (df.ToState == 'O')  )
# df['ExpectedGroup'] = (cond1.shift(-1) & cond1).cumsum().mask(~cond1)

# other working option:
# cond1 = ( (df.FromState == 'O') & (df.ToState == 'O')  )
# df['ExpectedGroup'] = (cond1.diff()&cond1).cumsum().where(cond1)


# failing here
windowval = (Window.partitionBy('ID').orderBy('Time').rowsBetween(Window.unboundedPreceding, 0))
df = df.withColumn('ExpectedGroup2', F.sum(F.when(cond1, F.col('finalCondition'))).over(windowval))

Upvotes: 0

Views: 123

Answers (2)

lihao
lihao

Reputation: 781

Just use the same logic shown in your Pandas code, use Window lag function to get the previous value of cond1, set the flag to 1 only when the current cond1 is true and the previous cond1 is false, and then do the cumsum based on cond1, see below code(BTW, you probably want to add ID to partitionBy clause of the WindSpec, in that case the last ExpectedGroup1 should be 1 instead of 3):

from pyspark.sql import functions as F, Window

w = Window.partitionBy().orderBy('ID', 'time')

df_new = (df.withColumn('cond1', (F.col('FromState')=='O') & (F.col('ToState')=='O'))
    .withColumn('f', F.when(F.col('cond1') & (~F.lag(F.col('cond1')).over(w)),1).otherwise(0))
    .withColumn('ExpectedGroup1', F.when(F.col('cond1'), F.sum('f').over(w)))
)
df_new.show()
+---+----------+---------+-------+-----+-------------+-----+---+--------------+
| ID|      Time|FromState|ToState|Hours|ExpectedGroup|cond1|  f|ExpectedGroup1|
+---+----------+---------+-------+-----+-------------+-----+---+--------------+
|  A|2019-01-01|        P|      O|    2|         null|false|  0|          null|
|  A|2019-01-02|        O|      O|    5|            1| true|  1|             1|
|  A|2019-01-03|        O|      O|   10|            1| true|  0|             1|
|  A|2019-01-04|        O|      P|    4|         null|false|  0|          null|
|  A|2019-01-05|        P|      P|  300|         null|false|  0|          null|
|  A|2019-01-06|        P|      O|    2|         null|false|  0|          null|
|  A|2019-01-07|        O|      O|    5|            2| true|  1|             2|
|  A|2019-01-08|        O|      O|   10|            2| true|  0|             2|
|  A|2019-01-09|        O|      P|    4|         null|false|  0|          null|
|  A|2019-01-10|        P|      P|  300|         null|false|  0|          null|
|  B|2019-01-01|        P|      O|    2|         null|false|  0|          null|
|  B|2019-01-02|        O|      O|    5|            3| true|  1|             3|
|  B|2019-01-03|        O|      O|   10|            3| true|  0|             3|
|  B|2019-01-04|        O|      P|    4|         null|false|  0|          null|
|  B|2019-01-05|        P|      P|  300|         null|false|  0|          null|
+---+----------+---------+-------+-----+-------------+-----+---+--------------+

Upvotes: 1

Ahmed Mohamed
Ahmed Mohamed

Reputation: 936

How to create a group column counter in PySpark?

To create a group column counter in PySpark, we can use the Window function and the row_number function. The Window function allows us to define a partitioning and ordering criteria for the rows, and the row_number function returns the position of the row within the window.

For example, suppose we have a PySpark DataFrame called df with the following data:

id name category
1 A X
2 B X
3 C Y
4 D Y
5 E Z

We want to create a new column called group_counter that assigns a number to each row within the same category, starting from 1. To do this, we can use the following code:

# Import the required modules
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Define the window specification
window = Window.partitionBy("category").orderBy("id")

# Create the group column counter
df = df.withColumn("group_counter", row_number().over(window))

# Show the result
df.show()

The output of the code is:

id name category group_counter
1 A X 1
2 B X 2
3 C Y 1
4 D Y 2
5 E Z 1

As we can see, the group_counter column increments by 1 for each row within the same category, and resets to 1 when the category changes.

Why is creating a group column counter useful?

Creating a group column counter can be useful for various purposes, such as:

  • Ranking the rows within a group based on some criteria, such as sales, ratings, or popularity.
  • Assigning labels or identifiers to the rows within a group, such as customer segments, product categories, or order numbers.
  • Performing calculations or aggregations based on the group column counter, such as cumulative sums, averages, or percentages.

Upvotes: 0

Related Questions