bolla
bolla

Reputation: 369

How to average a block of numbers separated by null in pyspark?

Given is the following pyspark dataframe, where the order is given by ID:

ID age
1 null
2 10
3 90
4 null
5 null
6 null
7 20
8 30
9 70
10 null

I need to create the average over blocks of consecutive numbers separated by null. Thus, I aim for the following data frame:

FirstID Last_ID avg_age
2 3 50
7 9 40
ID age non-null-block
1 null null
2 10 1
3 90 1
4 null null
5 null null
6 null null
7 20 2
8 30 2
9 70 2
10 null null

Which sounds like the very same problem I already have - and may also indicate, why I got also stuck with this.

Any help is appreciated. Thanks.

Upvotes: 1

Views: 159

Answers (1)

mck
mck

Reputation: 42352

You can use lag to check whether the previous row is null, and if the current row is not null, you can flag that as the start of a block. After that, sum the flags and you'll get the block number that you wanted.

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'block',
    (~F.col('age').isNull() & F.lag('age').over(Window.orderBy('ID')).isNull()).cast('int')
).withColumn(
    'block',
    F.when(~F.col('age').isNull(), F.sum('block').over(Window.orderBy('ID')))
)

df2.show()
+---+----+-----+
| ID| age|block|
+---+----+-----+
|  1|null| null|
|  2|  10|    1|
|  3|  90|    1|
|  4|null| null|
|  5|null| null|
|  6|null| null|
|  7|  20|    2|
|  8|  30|    2|
|  9|  70|    2|
| 10|null| null|
+---+----+-----+

Then you can do the aggregation:

df3 = (df2.filter('block is not null')
          .groupBy('block')
          .agg(F.min('ID').alias('First_ID'), F.max('ID').alias('Last_ID'), F.avg('age').alias('avg_age'))
          .drop('block')
      )

df3.show()
+--------+-------+-------+
|First_ID|Last_ID|avg_age|
+--------+-------+-------+
|       2|      3|   50.0|
|       7|      9|   40.0|
+--------+-------+-------+

Upvotes: 2

Related Questions