John Stud
John Stud

Reputation: 1779

PySpark: Incremental Row Counter

I am having difficulty implementing this existing answer: PySpark - get row number for each row in a group

Consider the following:

# create df
df = spark.createDataFrame(sc.parallelize([
    [1, 'A', 20220722, 1],
    [1, 'A', 20220723, 1],
    [1, 'B', 20220724, 2],
    [2, 'B', 20220722, 1],
    [2, 'C', 20220723, 2],
    [2, 'B', 20220724, 3],
]),
                           ['ID', 'State', 'Time', 'Expected'])

# rank
w = Window.partitionBy('State').orderBy('ID', 'Time')
df = df.withColumn('rn', F.row_number().over(w))
df = df.withColumn('rank', F.rank().over(w))
df = df.withColumn('dense', F.dense_rank().over(w))

# view
df.show()
+---+-----+--------+--------+---+----+-----+
| ID|State|    Time|Expected| rn|rank|dense|
+---+-----+--------+--------+---+----+-----+
|  1|    A|20220722|       1|  1|   1|    1|
|  1|    A|20220723|       1|  2|   2|    2|
|  1|    B|20220724|       2|  1|   1|    1|
|  2|    B|20220722|       1|  2|   2|    2|
|  2|    B|20220724|       3|  3|   3|    3|
|  2|    C|20220723|       2|  1|   1|    1|
+---+-----+--------+--------+---+----+-----+

How can I get the expected value and also sort the dates correctly such that they are ascending?

Upvotes: 0

Views: 64

Answers (1)

samkart
samkart

Reputation: 6644

you restart your count for each new id value, which means the id field is your partition field, not state.

an approach with sum window function.

data_sdf. \
    withColumn('st_notsame', 
               func.coalesce(func.col('state') != func.lag('state').over(wd.partitionBy('id').orderBy('time')), func.lit(True)).cast('int')
               ). \
    withColumn('rank', 
               func.sum('st_notsame').over(wd.partitionBy('id').orderBy('time', 'state').rowsBetween(-sys.maxsize, 0))
               ). \
    show()

# +---+-----+--------+--------+----------+----+
# | id|state|    time|expected|st_notsame|rank|
# +---+-----+--------+--------+----------+----+
# |  1|    A|20220722|       1|         1|   1|
# |  1|    A|20220723|       1|         0|   1|
# |  1|    B|20220724|       2|         1|   2|
# |  2|    B|20220722|       1|         1|   1|
# |  2|    C|20220723|       2|         1|   2|
# |  2|    B|20220724|       3|         1|   3|
# +---+-----+--------+--------+----------+----+
  • you first flag all the consecutive occurrences of the state as 0 and others as 1 - this'll enable you to do a running sum
  • use the sum window with infinite lookback for each id to get your desired ranking

Upvotes: 1

Related Questions