user8414391
user8414391

Reputation: 152

How to perform a multi-row multi-column operation in parallel within PySpark, with minimum loops?

I would like to perform a multi-row multi-column operation in Pyspark with less or no loops. Spark 'df' has following data

city    time    temp    humid
NewYork 1500    67      57
NewYork 1600    69      55
NewYork 1700    70      56
Dallas  1500    47      37
Dallas  1600    49      35
Dallas  1700    50      39    

I used 'For' loops but at the cost of parallelism and its not effective.

city_list = [i.city for i in df.select('city').distinct().collect()]
metric_cols = ['temp', 'humid']
for city in city_list:
    for metric in metric_cols:
        tempDF = df.filter(col("city") == city)
        metric_values = [(i[metric]) for i in tempDF.select(metric).collect()]
        time_values = [(i['time']) for i in tempDF.select('time').collect()]
        tuples = list(zip(time_values, metric_values))
        newColName = city + metric
        df = df.withColumn(newColName, lit(tuples)) 

I dont think its working either.

I expect the output to be

city    time  temp  humid timetemp                         timehumidity
NewYork 1500  67    57    [(1500,67),(1600,69),(1700,70)] [(1500,57),(1600,55),(1700,56)]
NewYork 1600  69    55    [(1500,67),(1600,69),(1700,70)] [(1500,57),(1600,55),(1700,56)]
NewYork 1700  70    56    [(1500,67),(1600,69),(1700,70)] [(1500,57),(1600,55),(1700,56)]
Dallas  1500  47    37    [(1500,47),(1600,49),(1700,50)] [(1500,37),(1600,35),(1700,39)]
Dallas  1600  49    35    [(1500,47),(1600,49),(1700,50)] [(1500,37),(1600,35),(1700,39)]
Dallas  1700  50    39    [(1500,47),(1600,49),(1700,50)] [(1500,37),(1600,35),(1700,39)]

or at the least

city     timetemp                         timehumidity
NewYork  [(1500,67),(1600,69),(1700,70)]  [(1500,57),(1600,55),(1700,56)]
Dallas   [(1500,47),(1600,49),(1700,50)]  [(1500,37),(1600,35),(1700,39)]

Upvotes: 0

Views: 459

Answers (2)

user8414391
user8414391

Reputation: 152

Found the solution with higher performance in PySpark

def create_tuples(df):
    mycols = ("temp","humid")
    lcols = mcols.copy()
    lcols.append("time")
    for lcol in lcols:
        df = df.select("*",collect_list(lcol).over(Window.partitionBy("city")).alias(lcol+'_list'))
    for mycol in mycols:
        df = df.withColumn(mycol+'_tuple', arrays_zip("time_list", mycol+'_list'))
    return df
tuples_df = create_tuples(df)

Upvotes: 1

Ala Tarighati
Ala Tarighati

Reputation: 3817

One option is to use struct function:

import pyspark.sql.functions as F
df.groupby('city').agg(F.collect_list(F.struct(F.col('time'),F.col('temp'))).alias('timetemp'), F.collect_list(F.struct(F.col('time'),F.col('humid'))).alias('timehumidity')).show(2, False)

Output:

+-------+------------------------------------+------------------------------------+
|city   |timetemp                            |timehumidity                        |
+-------+------------------------------------+------------------------------------+
|Dallas |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|NewYork|[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
+-------+------------------------------------+------------------------------------+

You can join it with you original dataframe.
If you want to have the results as tuples, then you might need to write your own udf.


You also can define lists of columns and handle more column sets:

list_1 = ['time']
list_2 = ['temp', 'humid'] #change these accordingly

df_array = [df.groupby('city').agg((F.collect_list(F.struct(F.col(x),F.col(y)))).alias(x+y)) for x in list_1 for y in list_2]
for df_temp in df_array:
    df = df.join(df_temp, on='city', how='left')
df.show()

Output:

+-------+----+----+-----+------------------------------------+------------------------------------+
|city   |time|temp|humid|timetemp                            |timehumid                           |
+-------+----+----+-----+------------------------------------+------------------------------------+
|Dallas |1500|47  |37   |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|Dallas |1600|49  |35   |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|Dallas |1700|50  |39   |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|NewYork|1500|67  |57   |[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
|NewYork|1600|69  |55   |[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
|NewYork|1700|70  |56   |[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
+-------+----+----+-----+------------------------------------+------------------------------------+

Upvotes: 2

Related Questions