Reputation: 3282
I have a dataframe that looks like this:
City State Hour Score Percentage
DEN CO 1 0 0
DEN CO 1 0 0
DEN CO 2 2 99
DEN CO 3 0 0
NYC NYC 1 0 0
I want it to look like this:
City State total_hours total_scores total_perct. total_volume
DEN CO [1,2,3] [0,2,0] [0,99,0] [2,1,1]
NYC NYC [1] [0] [0] [1]
For total_hours
I'm just doing a collect_set
for the City
and State
for the total_scores
I'm doing a collect_set for each specific hour and then collect all the scores for all hours. Ex: There are 2 scores for hour 1 Den CO, 0 and 0, I only take one of them and then hour 2 has 1, so it becomes [0,2]
. Same thing with total_perct.
For total_volume
, I'm taking the count for each hour and I do collect_list
for all hours of the same city and state.
This is basically what I'd like to achieve. If I do a groupBy
like so:
df.groupBy("city", "state", "hour")
.agg(collect_set("Hour").alias("total_hours"), collect_set("Score").alias("total_scores"),
collect_set("Percentage").alias("total_perct."), count("hour").alias("total_volume"))
I'll get the following dataframe:
City State total_hours total_scores total_perct. total_volume
DEN CO [1] [0] [0] 2
DEN CO [2] [2] [99] 1
DEN CO [3] [0] [0] 1
NYC NYC [1] [0] [0] 1
I don't understand what to do from here. How can I get what I have now and achieve the final result? I'm using PySpark.
Upvotes: 1
Views: 376
Reputation: 13541
Spark < 2.4
Need to use the udf
but really slow in this case. :(
import itertools
from pyspark.sql.functions import max, count, col, collect_list, collect_set, udf
from pyspark.sql.types import ArrayType, IntegerType
@udf
def flatten(col):
return list(itertools.chain.from_iterable(col))
df.groupBy('City', 'State', 'Hour') \
.agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
.orderBy('City', 'State', 'Hour') \
.groupBy('City', 'State') \
.agg(collect_list(col('Hour')).alias('total_hours'), collect_list(col('Score')).alias('total_scores'), collect_list(col('Percentage')).alias('total_perct'), collect_list(col('total_volume')).alias('total_volume')) \
.select('City', 'State', 'total_hours', flatten(col('total_scores')), flatten(col('total_perct')), 'total_volume') \
.show(10, False)
Spark 2.4+
Ok, this is worked with collect_set
and collect_list
.
from pyspark.sql.functions import max, count, col, collect_list, flatten
df.groupBy('City', 'State', 'Hour') \
.agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
.orderBy('City', 'State', 'Hour') \
.groupBy('City', 'State') \
.agg(collect_list(col('Hour')).alias('total_hours'), flatten(collect_list(col('Score'))).alias('total_scores'), flatten(collect_list(col('Percentage'))).alias('total_perct.'), collect_list(col('total_volume')).alias('total_volume')) \
.show(10, False)
+----+-----+-----------+------------+------------+------------+
|City|State|total_hours|total_scores|total_perct.|total_volume|
+----+-----+-----------+------------+------------+------------+
|NYC |NYC |[1] |[0] |[0] |[1] |
|DEN |CO |[1, 2, 3] |[0, 2, 0] |[0, 99, 0] |[2, 1, 1] |
+----+-----+-----------+------------+------------+------------+
If you did not put the orderBy
during this step, then the order of the result list will be mixed.
Upvotes: 1
Reputation: 75080
Another way using a window to count appearances of Hour , then filter 1 index (idx) based on the partition, and then groupby + collect_list
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy("City","State","Hour")
l = ['Hour','Score', 'Percentage', 'Volume']
(df.withColumn("idx",F.monotonically_increasing_id()).select("*",
F.count("Hour").over(w).alias("Volume"),F.max("idx").over(w).alias("Indx"))
.filter(F.col("idx")==F.col("Indx")).orderBy("idx").groupBy("City","State")
.agg(*[F.collect_list(i).alias(f"total_{i}") for i in l])).show()
Output:
+----+-----+----------+-----------+----------------+------------+
|City|State|total_Hour|total_Score|total_Percentage|total_Volume|
+----+-----+----------+-----------+----------------+------------+
| NYC| NYC| [1]| [0]| [0]| [1]|
| DEN| CO| [1, 2, 3]| [0, 2, 0]| [0, 99, 0]| [2, 1, 1]|
+----+-----+----------+-----------+----------------+------------+
Upvotes: 1