Reputation: 152
I would like to perform a multi-row multi-column operation in Pyspark with less or no loops. Spark 'df' has following data
city time temp humid
NewYork 1500 67 57
NewYork 1600 69 55
NewYork 1700 70 56
Dallas 1500 47 37
Dallas 1600 49 35
Dallas 1700 50 39
I used 'For' loops but at the cost of parallelism and its not effective.
city_list = [i.city for i in df.select('city').distinct().collect()]
metric_cols = ['temp', 'humid']
for city in city_list:
for metric in metric_cols:
tempDF = df.filter(col("city") == city)
metric_values = [(i[metric]) for i in tempDF.select(metric).collect()]
time_values = [(i['time']) for i in tempDF.select('time').collect()]
tuples = list(zip(time_values, metric_values))
newColName = city + metric
df = df.withColumn(newColName, lit(tuples))
I dont think its working either.
I expect the output to be
city time temp humid timetemp timehumidity
NewYork 1500 67 57 [(1500,67),(1600,69),(1700,70)] [(1500,57),(1600,55),(1700,56)]
NewYork 1600 69 55 [(1500,67),(1600,69),(1700,70)] [(1500,57),(1600,55),(1700,56)]
NewYork 1700 70 56 [(1500,67),(1600,69),(1700,70)] [(1500,57),(1600,55),(1700,56)]
Dallas 1500 47 37 [(1500,47),(1600,49),(1700,50)] [(1500,37),(1600,35),(1700,39)]
Dallas 1600 49 35 [(1500,47),(1600,49),(1700,50)] [(1500,37),(1600,35),(1700,39)]
Dallas 1700 50 39 [(1500,47),(1600,49),(1700,50)] [(1500,37),(1600,35),(1700,39)]
or at the least
city timetemp timehumidity
NewYork [(1500,67),(1600,69),(1700,70)] [(1500,57),(1600,55),(1700,56)]
Dallas [(1500,47),(1600,49),(1700,50)] [(1500,37),(1600,35),(1700,39)]
Upvotes: 0
Views: 459
Reputation: 152
Found the solution with higher performance in PySpark
def create_tuples(df):
mycols = ("temp","humid")
lcols = mcols.copy()
lcols.append("time")
for lcol in lcols:
df = df.select("*",collect_list(lcol).over(Window.partitionBy("city")).alias(lcol+'_list'))
for mycol in mycols:
df = df.withColumn(mycol+'_tuple', arrays_zip("time_list", mycol+'_list'))
return df
tuples_df = create_tuples(df)
Upvotes: 1
Reputation: 3817
One option is to use struct
function:
import pyspark.sql.functions as F
df.groupby('city').agg(F.collect_list(F.struct(F.col('time'),F.col('temp'))).alias('timetemp'), F.collect_list(F.struct(F.col('time'),F.col('humid'))).alias('timehumidity')).show(2, False)
Output:
+-------+------------------------------------+------------------------------------+
|city |timetemp |timehumidity |
+-------+------------------------------------+------------------------------------+
|Dallas |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|NewYork|[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
+-------+------------------------------------+------------------------------------+
You can join it with you original dataframe.
If you want to have the results as tuples, then you might need to write your own udf
.
You also can define lists of columns and handle more column sets:
list_1 = ['time']
list_2 = ['temp', 'humid'] #change these accordingly
df_array = [df.groupby('city').agg((F.collect_list(F.struct(F.col(x),F.col(y)))).alias(x+y)) for x in list_1 for y in list_2]
for df_temp in df_array:
df = df.join(df_temp, on='city', how='left')
df.show()
Output:
+-------+----+----+-----+------------------------------------+------------------------------------+
|city |time|temp|humid|timetemp |timehumid |
+-------+----+----+-----+------------------------------------+------------------------------------+
|Dallas |1500|47 |37 |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|Dallas |1600|49 |35 |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|Dallas |1700|50 |39 |[[1500, 47], [1600, 49], [1700, 50]]|[[1500, 37], [1600, 35], [1700, 39]]|
|NewYork|1500|67 |57 |[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
|NewYork|1600|69 |55 |[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
|NewYork|1700|70 |56 |[[1500, 67], [1600, 69], [1700, 70]]|[[1500, 57], [1600, 55], [1700, 56]]|
+-------+----+----+-----+------------------------------------+------------------------------------+
Upvotes: 2