Misha
Misha

Reputation: 133

scala program search for the most recent values

I want to create a df based on a hive sql below:

WITH FILTERED_table1 AS (select *
, row_number() over (partition by key_timestamp order by datime DESC) rn
FROM table1)

scala function:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val table1 = Window.partitionBy($"key_timestamp").orderBy($"datime".desc)

I looked into the window function and that was what I can come up with, I am not sure how to write this in scala function as I am very new to scala. how do I return a df from the sql use scala functions? any suggestions will be greatly appreciated. :)

Upvotes: 0

Views: 38

Answers (1)

Leo C
Leo C

Reputation: 22449

Your Window spec is correct. Using a dummy dataset, let's first load your original Hive table into a DataFrame:

val df = spark.sql("""select * from table1""")

df.show
// +-------------+-------------------+
// |key_timestamp|             datime|
// +-------------+-------------------+
// |            1|2018-06-01 00:00:00|
// |            1|2018-07-01 00:00:00|
// |            2|2018-05-01 00:00:00|
// |            2|2018-07-01 00:00:00|
// |            2|2018-06-01 00:00:00|
// +-------------+-------------------+

To apply the Window function row_number (over the Window spec) to the DataFrame, use withColumn to generate a new column to capture the function result:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val window = Window.partitionBy($"key_timestamp").orderBy($"datime".desc)

val resultDF = df.withColumn("rn", row_number.over(window))

resultDF.show
// +-------------+-------------------+---+
// |key_timestamp|             datime| rn|
// +-------------+-------------------+---+
// |            1|2018-07-01 00:00:00|  1|
// |            1|2018-06-01 00:00:00|  2|
// |            2|2018-07-01 00:00:00|  1|
// |            2|2018-06-01 00:00:00|  2|
// |            2|2018-05-01 00:00:00|  3|
// +-------------+-------------------+---+

To verify, run your SQL against table1 and you should get the same result:

spark.sql("""
    select *, row_number() over
      (partition by key_timestamp order by datime desc) rn
    from table1
  """).show

Upvotes: 1

Related Questions