ezamur
ezamur

Reputation: 2182

Spark SQL Window over interval of between two specified time boundaries - between 3 hours and 2 hours ago

What is the proper way of specifying window interval in Spark SQL, using two predefined boundaries?

I am trying to sum up values from my table over a window of "between 3 hours ago and 2 hours ago".

When I run this query:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;

That works. I get results that I expect, i.e. sums of values that fall into 2 hours rolling window.

Now, what I need is to have that rolling window not being bound to the current row but to take into account rows between 3 hours ago and 2 hours ago. I tried with:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;

But I get extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'} error.

I also tried with:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;

but then I get different error scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)

Third option I tried is:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;

and it doesn't work as we would expect: cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch

I am having difficulties finding the docs for interval type as this link doesn't say enough and other information is kinda half baked. At least what I found.

Upvotes: 5

Views: 5771

Answers (4)

Twilight
Twilight

Reputation: 21

I know this is an old question but thought I'd throw out that I think the original issue is syntax.

You have:

RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING

but that is an interval and an integer. This should work:

RANGE BETWEEN interval 3 hours PRECEDING AND interval 2 hours PRECEDING

Upvotes: 1

user13006309
user13006309

Reputation: 11

A workaround for getting the same result would be to calculate the sum of the value within the last 3 hours and then subtract the sum of the value within the last 2 hours:

select *, 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 3 hours preceding and current row) 
- 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 2 hours preceding and current row) 
as sum_value
from my_temp_table;

Upvotes: 1

SakuraFreak
SakuraFreak

Reputation: 315

Had the same issue and found a simple resolution. There you go:

unix_timestamp(datestamp) - unix_timestamp(datestamp) < 10800 --3 hours in seconds 

You can use timestamp for readibility also. (Wonder if needed):

select unix_timestamp(date_format(current_timestamp, 'HH:mm:ss'), 'HH:mm:ss') <
       unix_timestamp('03:00:00', 'HH:mm:ss') --Used timestamp for readibility

Upvotes: 0

ezamur
ezamur

Reputation: 2182

Since range intervals didn't work their thing, I had to turn to an alternative approach. It goes something like this:

  • prepare a list of intervals for which computation needs to be performed
  • for each of the intervals, run the computation
    • each of those iterations produces a data frame
  • after the iterations, we have a list of data frames
  • union the data frames from the list into one bigger data frame
  • write out the results

In my case, I had to run computations for each hour of the day and combine those "hourly" results, i.e. a list of 24 data frames, into one, "daily", data frame.

Code, from very high level perspective, looks like this:

val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
    val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
    // do stuff
    // return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))

Upvotes: 1

Related Questions