peakstatus
peakstatus

Reputation: 441

Scala - Selecting minimum values by group

When looking at my input data frame below, what I'm hoping to do is be able to select the timeframe for each month where Diff_from_50 is the lowest. If there are any ties in this value, it should look at the AvgWindSpeed and select which ever has the lowest windspeed.

What would be the best way to do this in Scala? I've been working with the following code, but when I group by Month I lose my other columns. I'm also not exactly sure how to approach comparing the differences in temperature and then select the one with the lowest WindSpeed if there are ties.

Any suggestions/tips would be appreciated.

Current Code:

val oshdata = osh.select(col("TemperatureF"),col("Wind SpeedMPH"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).withColumn("TemperatureF",when(col("TemperatureF").equalTo(-9999),null).otherwise(col("TemperatureF"))).withColumn("Wind SpeedMPH",when(col("Wind SpeedMPH").equalTo(-9999),null).otherwise(col("Wind SpeedMPH"))).withColumn("WindSpeed",when($"Wind SpeedMPH" === "Calm",0).otherwise($"Wind SpeedMPH"))

val ts = to_timestamp($"Date","MM/dd/yyyy hh:mm a")
val Oshmydata=oshdata.withColumn("ts",ts)
val OshgroupByWindow = Oshmydata.groupBy(window(col("ts"), "1 hour")).agg(avg("TemperatureF").as("avgTemp"),avg("WindSpeed").as("AvgWindSpeed")).select("window.start", "window.end", "avgTemp","AvgWindSpeed")
val Oshdaily = OshgroupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),date_format($"_tmp".getItem(1),"hh:mm:ss a").as("startTime"),$"end",$"avgTemp",$"AvgWindSpeed").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",date_format($"_tmp2".getItem(1),"hh:mm:ss a").as("EndTime"),$"avgTemp",$"AvgWindSpeed").withColumn("Diff_From_50",abs($"avgTemp"-50))
val OshfinalData = Oshdaily.select(col("*"),month(col("Date")).as("Month")).orderBy($"Month",$"StartTime")
OshfinalData.createOrReplaceTempView("oshView")


val testing = OshfinalData.select(col("*")).groupBy($"Month",$"StartTime").agg(avg($"avgTemp").as("avgTemp"),avg($"AvgWindSpeed").as("AvgWindSpeed"))

val withDiff = testing.withColumn("Diff_from_50",abs($"avgTemp"-50))

withDiff.select(col("*")).groupBy($"Month").agg(min("Diff_from_50")).show()

Input Data Frame:

+-----+-----------+------------------+------------------+-------------------+
|Month|  StartTime|           avgTemp|      AvgWindSpeed|       Diff_from_50|
+-----+-----------+------------------+------------------+-------------------+
|    1|01:00:00 AM|17.375469072164957| 8.336983230663929|  32.62453092783504|
|    1|01:00:00 PM| 23.70729813664597|10.294075601374567|  26.29270186335403|
|    1|02:00:00 AM| 17.17661058638331| 8.332715559474817| 32.823389413616695|
|    1|02:00:00 PM| 23.78028142954523|10.131929492774708|  26.21971857045477|
|    1|03:00:00 AM|16.979751170960192| 8.305847424684158|  33.02024882903981|
|    1|03:00:00 PM| 23.78028142954523|11.131929492774708|  26.21971857045477|
|    2|01:00:00 AM| 18.19221536796537| 8.104439935064937|  31.80778463203463|
|    2|01:00:00 PM|25.602093162953263|10.756156072520753| 24.397906837046737|
|    2|02:00:00 AM|  17.7650265755505| 8.142266514806375|   32.2349734244495|
|    2|02:00:00 PM|25.602093162953263|11.756156072520753| 24.397906837046737|
+-----+-----------+------------------+------------------+-------------------+

Expected output:

+-----+-----------+------------------+------------------+-------------------+
|Month|  StartTime|           avgTemp|      AvgWindSpeed|       Diff_from_50|
+-----+-----------+------------------+------------------+-------------------+
|    1|02:00:00 PM| 23.78028142954523|10.131929492774708|  26.21971857045477|
|    2|01:00:00 PM|25.602093162953263|10.756156072520753| 24.397906837046737|
+-----+-----------+------------------+------------------+-------------------+

Upvotes: 1

Views: 1092

Answers (1)

Kobotan
Kobotan

Reputation: 112

You can use the Window function:

    val monthsLowest = Window
      .partitionBy('Month)
      .orderBy('Diff_from_50.asc, 'AvgWindSpeed.asc)
    df.withColumn("rn", row_number over monthsLowest)
      .where($"rn" === 1)
      .drop("rn")
      .show()

It will give you the same expected output.

For more information about Window functions in spark, there is a great guide https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-windows.html

You can also take a look at answer: How to select the first row of each group?

Upvotes: 2

Related Questions