Reputation: 731
How do I find the id of maximum value of another column and average value using pyspark dataframe?
df
:
+-------------+-------+----------+---------+----------+--------+------+------------------+
|ChargingEvent| CPID| StartDate|StartTime| EndDate| EndTime|Energy| PluginDuration|
+-------------+-------+----------+---------+----------+--------+------+------------------+
| 16673806|AN11719|2017-12-31| 14:46:00|2017-12-31|18:00:00| 2.4|3.2333333333333334|
| 16670986|AN01706|2017-12-31| 11:25:00|2017-12-31|13:14:00| 6.1|1.8166666666666667|
| 3174961|AN18584|2017-12-31| 11:26:11|2018-01-01|12:54:11| 24|25.466666666666665|
Current code:
df.agg({'PluginDuration': 'max'}).show()
df.agg({'PluginDuration': 'avg'}).show()
Then rename the all of the columns to as like the expected outcome below:
+-------------------+-------------------+------------------+
|id |max_value |avg_value
+-------------------+-------------------+-------------------+
| QWER |96.26 |12.35 |
id
rename from CPID
. And max_value
& avg_value
needs to round up to 2 decimal places.
Upvotes: 0
Views: 851
Reputation: 1
def extract(self):
df = self.spark_session.read.csv(self.input_path,header=True)
return df
def transform(self, df):
from pyspark.sql.functions import sum, avg, max
from pyspark.sql.functions import col
df1=df.withColumn('PluginDuration',
col('PluginDuration').cast('double'))
df2=df1.groupBy("CPID") \
.agg(max("PluginDuration").alias("max_duration"),avg("PluginDuration").alias("avg_duration"))
#df2.show(truncate=False)
df3 = df2.select(col("CPID").alias("chargepoint_id"),
func.round(df2["max_duration"], 2).alias("max_duration"),
func.round(df2["avg_duration"], 2).alias("avg_duration"))
#df3.show(truncate=False)
return df3
def load(self, df):
df.write.parquet(self.output_path)
output looks like: enter image description here
Upvotes: 0
Reputation: 707
I have basically adapted an SQL approach to fit into the dataframe methods, which works and answers the question.
from pyspark.sql import functions as F
# get the max and average values from the column
mx = df.agg({'PluginDuration':'max'}).collect()[0][0]
av = df.agg({'PluginDuration':'avg'}).collect()[0][0]
# add max and avg olumns, then select cols with rename
# and then sort by value and limit to top
df\
.withColumn('max_value', F.lit(round(mx,2)))\
.withColumn('avg_value', F.lit(round(av,2)))\
.sort('PluginDuration', ascending = False)\
.selectExpr('CPID as id', 'max_value', 'avg_value')\
.limit(1)\
.show()
Upvotes: 1