user2896120
user2896120

Reputation: 3282

Grouping and aggregating twice

I have a dataframe that looks like this:

City     State     Hour   Score     Percentage
DEN      CO        1      0         0
DEN      CO        1      0         0
DEN      CO        2      2         99
DEN      CO        3      0         0
NYC      NYC       1      0         0

I want it to look like this:

City     State     total_hours  total_scores  total_perct.   total_volume      
DEN      CO        [1,2,3]      [0,2,0]       [0,99,0]       [2,1,1]
NYC      NYC       [1]          [0]           [0]            [1]

For total_hours I'm just doing a collect_set for the City and State for the total_scores I'm doing a collect_set for each specific hour and then collect all the scores for all hours. Ex: There are 2 scores for hour 1 Den CO, 0 and 0, I only take one of them and then hour 2 has 1, so it becomes [0,2]. Same thing with total_perct. For total_volume, I'm taking the count for each hour and I do collect_list for all hours of the same city and state.

This is basically what I'd like to achieve. If I do a groupBy like so:

df.groupBy("city", "state", "hour")
  .agg(collect_set("Hour").alias("total_hours"), collect_set("Score").alias("total_scores"), 
       collect_set("Percentage").alias("total_perct."), count("hour").alias("total_volume"))

I'll get the following dataframe:

City     State     total_hours  total_scores  total_perct.   total_volume 
DEN      CO        [1]          [0]           [0]            2
DEN      CO        [2]          [2]           [99]           1
DEN      CO        [3]          [0]           [0]            1
NYC      NYC       [1]          [0]           [0]            1

I don't understand what to do from here. How can I get what I have now and achieve the final result? I'm using PySpark.

Upvotes: 1

Views: 376

Answers (2)

Lamanus
Lamanus

Reputation: 13541

Spark < 2.4

Need to use the udf but really slow in this case. :(

import itertools
from pyspark.sql.functions import max, count, col, collect_list, collect_set, udf
from pyspark.sql.types import ArrayType, IntegerType

@udf
def flatten(col):
    return list(itertools.chain.from_iterable(col))

df.groupBy('City', 'State', 'Hour') \
  .agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
  .orderBy('City', 'State', 'Hour') \
  .groupBy('City', 'State') \
  .agg(collect_list(col('Hour')).alias('total_hours'), collect_list(col('Score')).alias('total_scores'), collect_list(col('Percentage')).alias('total_perct'), collect_list(col('total_volume')).alias('total_volume')) \
  .select('City', 'State', 'total_hours', flatten(col('total_scores')), flatten(col('total_perct')), 'total_volume') \
  .show(10, False)

Spark 2.4+

Ok, this is worked with collect_set and collect_list.

from pyspark.sql.functions import max, count, col, collect_list, flatten

df.groupBy('City', 'State', 'Hour') \
  .agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
  .orderBy('City', 'State', 'Hour') \
  .groupBy('City', 'State') \
  .agg(collect_list(col('Hour')).alias('total_hours'), flatten(collect_list(col('Score'))).alias('total_scores'), flatten(collect_list(col('Percentage'))).alias('total_perct.'), collect_list(col('total_volume')).alias('total_volume')) \
  .show(10, False)

+----+-----+-----------+------------+------------+------------+
|City|State|total_hours|total_scores|total_perct.|total_volume|
+----+-----+-----------+------------+------------+------------+
|NYC |NYC  |[1]        |[0]         |[0]         |[1]         |
|DEN |CO   |[1, 2, 3]  |[0, 2, 0]   |[0, 99, 0]  |[2, 1, 1]   |
+----+-----+-----------+------------+------------+------------+

If you did not put the orderBy during this step, then the order of the result list will be mixed.

Upvotes: 1

anky
anky

Reputation: 75080

Another way using a window to count appearances of Hour , then filter 1 index (idx) based on the partition, and then groupby + collect_list

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window.partitionBy("City","State","Hour")
l = ['Hour','Score', 'Percentage', 'Volume']

(df.withColumn("idx",F.monotonically_increasing_id()).select("*",
            F.count("Hour").over(w).alias("Volume"),F.max("idx").over(w).alias("Indx"))
            .filter(F.col("idx")==F.col("Indx")).orderBy("idx").groupBy("City","State")
            .agg(*[F.collect_list(i).alias(f"total_{i}") for i in l])).show()

Output:

+----+-----+----------+-----------+----------------+------------+
|City|State|total_Hour|total_Score|total_Percentage|total_Volume|
+----+-----+----------+-----------+----------------+------------+
| NYC|  NYC|       [1]|        [0]|             [0]|         [1]|
| DEN|   CO| [1, 2, 3]|  [0, 2, 0]|      [0, 99, 0]|   [2, 1, 1]|
+----+-----+----------+-----------+----------------+------------+

Upvotes: 1

Related Questions