Reputation: 441
When looking at my input data frame below, what I'm hoping to do is be able to select the timeframe for each month where Diff_from_50
is the lowest. If there are any ties in this value, it should look at the AvgWindSpeed
and select which ever has the lowest windspeed.
What would be the best way to do this in Scala? I've been working with the following code, but when I group by Month
I lose my other columns. I'm also not exactly sure how to approach comparing the differences in temperature and then select the one with the lowest WindSpeed if there are ties.
Any suggestions/tips would be appreciated.
Current Code:
val oshdata = osh.select(col("TemperatureF"),col("Wind SpeedMPH"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).withColumn("TemperatureF",when(col("TemperatureF").equalTo(-9999),null).otherwise(col("TemperatureF"))).withColumn("Wind SpeedMPH",when(col("Wind SpeedMPH").equalTo(-9999),null).otherwise(col("Wind SpeedMPH"))).withColumn("WindSpeed",when($"Wind SpeedMPH" === "Calm",0).otherwise($"Wind SpeedMPH"))
val ts = to_timestamp($"Date","MM/dd/yyyy hh:mm a")
val Oshmydata=oshdata.withColumn("ts",ts)
val OshgroupByWindow = Oshmydata.groupBy(window(col("ts"), "1 hour")).agg(avg("TemperatureF").as("avgTemp"),avg("WindSpeed").as("AvgWindSpeed")).select("window.start", "window.end", "avgTemp","AvgWindSpeed")
val Oshdaily = OshgroupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),date_format($"_tmp".getItem(1),"hh:mm:ss a").as("startTime"),$"end",$"avgTemp",$"AvgWindSpeed").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",date_format($"_tmp2".getItem(1),"hh:mm:ss a").as("EndTime"),$"avgTemp",$"AvgWindSpeed").withColumn("Diff_From_50",abs($"avgTemp"-50))
val OshfinalData = Oshdaily.select(col("*"),month(col("Date")).as("Month")).orderBy($"Month",$"StartTime")
OshfinalData.createOrReplaceTempView("oshView")
val testing = OshfinalData.select(col("*")).groupBy($"Month",$"StartTime").agg(avg($"avgTemp").as("avgTemp"),avg($"AvgWindSpeed").as("AvgWindSpeed"))
val withDiff = testing.withColumn("Diff_from_50",abs($"avgTemp"-50))
withDiff.select(col("*")).groupBy($"Month").agg(min("Diff_from_50")).show()
Input Data Frame:
+-----+-----------+------------------+------------------+-------------------+
|Month| StartTime| avgTemp| AvgWindSpeed| Diff_from_50|
+-----+-----------+------------------+------------------+-------------------+
| 1|01:00:00 AM|17.375469072164957| 8.336983230663929| 32.62453092783504|
| 1|01:00:00 PM| 23.70729813664597|10.294075601374567| 26.29270186335403|
| 1|02:00:00 AM| 17.17661058638331| 8.332715559474817| 32.823389413616695|
| 1|02:00:00 PM| 23.78028142954523|10.131929492774708| 26.21971857045477|
| 1|03:00:00 AM|16.979751170960192| 8.305847424684158| 33.02024882903981|
| 1|03:00:00 PM| 23.78028142954523|11.131929492774708| 26.21971857045477|
| 2|01:00:00 AM| 18.19221536796537| 8.104439935064937| 31.80778463203463|
| 2|01:00:00 PM|25.602093162953263|10.756156072520753| 24.397906837046737|
| 2|02:00:00 AM| 17.7650265755505| 8.142266514806375| 32.2349734244495|
| 2|02:00:00 PM|25.602093162953263|11.756156072520753| 24.397906837046737|
+-----+-----------+------------------+------------------+-------------------+
Expected output:
+-----+-----------+------------------+------------------+-------------------+
|Month| StartTime| avgTemp| AvgWindSpeed| Diff_from_50|
+-----+-----------+------------------+------------------+-------------------+
| 1|02:00:00 PM| 23.78028142954523|10.131929492774708| 26.21971857045477|
| 2|01:00:00 PM|25.602093162953263|10.756156072520753| 24.397906837046737|
+-----+-----------+------------------+------------------+-------------------+
Upvotes: 1
Views: 1092
Reputation: 112
You can use the Window function:
val monthsLowest = Window
.partitionBy('Month)
.orderBy('Diff_from_50.asc, 'AvgWindSpeed.asc)
df.withColumn("rn", row_number over monthsLowest)
.where($"rn" === 1)
.drop("rn")
.show()
It will give you the same expected output.
For more information about Window functions in spark, there is a great guide https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html
You can also take a look at answer: How to select the first row of each group?
Upvotes: 2