Reputation: 86
Given a table with entries at irregular time stamps, "breaks" must be inserted at regular 5 min intervals ( the data associated can / will be NULL ).
I was thinking of getting the start time, making a subquery that has a window function and adds 5 min intervals to the start time - but I only could think of using row_number to increment the values.
WITH data as(
select id, data,
cast(date_and_time as double) * 1000 as time_milliseconds
from t1), -- original data
start_times as(
select id, MIN(CAST(date_and_time as double) * 1000) as start_time
from t1
GROUP BY id
), -- first timestamp for each id
boundries as (
SELECT T1.id,(row_number() OVER (PARTITION BY T1.id ORDER BY T1.date_and_time)-1) *300000 + start_times.start_time
as boundry
from T1
INNER JOIN start_times ON start_times.id= T1.id
) -- increment the number of 5 min added on each row and later full join boundries table with original data
However this limits me to the number of rows present for an id in the original data table, and if the timestamps are spread out, the number of rows cannot cover the amount of 5 min intervals needed to be added.
sample data:
initial data:
|-----------|------------------|------------------|
| id | value | timestamp |
|-----------|------------------|------------------|
| 1 | 3 | 12:00:01.011 |
|-----------|------------------|------------------|
| 1 | 4 | 12:03:30.041 |
|-----------|------------------|------------------|
| 1 | 5 | 12:12:20.231 |
|-----------|------------------|------------------|
| 1 | 3 | 15:00:00.312 |
data after my query:
|-----------|------------------|------------------|
| id | value | timestamp (UNIX) |
|-----------|------------------|------------------|
| 1 | 3 | 12:00:01 |
|-----------|------------------|------------------|
| 1 | 4 | 12:03:30 |
|-----------|------------------|------------------|
| 1 | NULL | 12:05:01 | <-- Data from "boundries"
|-----------|------------------|------------------|
| 1 | NULL | 12:10:01 | <-- Data from "boundries"
|-----------|------------------|------------------|
| 1 | 5 | 12:12:20 |
|-----------|------------------|------------------|
| 1 | NULL | 12:15:01 | <-- Data from "boundries"
|-----------|------------------|------------------|
| 1 | NULL | 12:20:01 | <-- Data from "boundries"
|-----------|------------------|------------------| <-- Jumping directly to 15:00:00 (WRONG! :( need to insert more 5 min breaks here )
| 1 | 3 | 15:00:00 |
I was thinking of creating a temporary table inside HIVE and filling it with x rows representing 5 min intervals from the starttime to the endtime of the data table, but I couldn't find any way of accomplishing that.
Any way of using "for loops" ? Any suggestions would be appreciated.
Thanks
Upvotes: 2
Views: 207
Reputation: 38325
You can try calculating the difference between current timestamp and next one, divide 300 to get number of ranges, produce a string of spaces with length = num_ranges, explode to generate rows.
Demo:
with your_table as (--initial data example
select stack (3,
1,3 ,'2020-01-01 12:00:01.011',
1,4 ,'2020-01-01 12:03:30.041',
1,5 ,'2020-01-01 12:20:20.231'
) as (id ,value ,ts )
)
select id ,value, ts, next_ts,
diff_sec,num_intervals,
from_unixtime(unix_timestamp(ts)+h.i*300) new_ts, coalesce(from_unixtime(unix_timestamp(ts)+h.i*300),ts) as calculated_timestamp
from
(
select id ,value ,ts, next_ts, (unix_timestamp(next_ts)-unix_timestamp(ts)) diff_sec,
floor((unix_timestamp(next_ts)-unix_timestamp(ts))/300 --diff in seconds/5 min
) num_intervals
from
(
select id ,value ,ts, lead(ts) over(order by ts) next_ts
from your_table
) s
)s
lateral view outer posexplode(split(space(cast(s.num_intervals as int)),' ')) h as i,x --this will generate rows
Result:
id value ts next_ts diff_sec num_intervals new_ts calculated_timestamp
1 3 2020-01-01 12:00:01.011 2020-01-01 12:03:30.041 209 0 2020-01-01 12:00:01 2020-01-01 12:00:01
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:03:30 2020-01-01 12:03:30
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:08:30 2020-01-01 12:08:30
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:13:30 2020-01-01 12:13:30
1 4 2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010 3 2020-01-01 12:18:30 2020-01-01 12:18:30
1 5 2020-01-01 12:20:20.231 \N \N \N \N 2020-01-01 12:20:20.231
Additional rows were added. I left all intermediate columns for debugging purposes.
Upvotes: 2
Reputation: 10035
A recursive query could be helpful here but hive does not support these more info.
You may consider creating the table outside of hive or writing a UDF.
Either way this query can be expensive and the use of materialized views/tables are recommended depending on your frequency.
The example shows a UDF inbetween
created using pyspark
to run the query. It
CTEs
and the UDF
to create a temporary table intervals
possible_records
The code below shows how it was evaluated using hive
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,ArrayType
inbetween = lambda min_value,max_value : [*range(min_value,max_value,5*60)]
udf_inbetween = udf(inbetween,ArrayType(IntegerType()))
sqlContext.udf.register("inbetween",udf_inbetween)
sqlContext.sql("""
WITH max_timestamp(t) as (
select max(timestamp) as t from initial_data2
),
min_timestamp(t) as (
select min(timestamp) as t from initial_data2
),
intervals as (
select explode(inbetween(unix_timestamp(mint.t),unix_timestamp(maxt.t))) as interval_time FROM
min_timestamp mint, max_timestamp maxt
),
unique_ids as (
select distinct id from initial_data2
),
interval_times as (
select interval_time from (
select
cast(from_unixtime(interval_time) as timestamp) as interval_time
from
intervals
UNION
select distinct d.timestamp as interval_time from initial_data2 d
)
order by interval_time asc
),
possible_records as (
select
distinct
d.id,
i.interval_time
FROM
interval_times i, unique_ids d
)
select
p.id,
d.value,
split(cast(p.interval_time as string)," ")[1] as timestamp
FROM
possible_records p
LEFT JOIN
initial_data2 d ON d.id = p.id and d.timestamp = p.interval_time
ORDER BY p.id, p.interval_time
""").show(20)
+---+-----+---------+
| id|value|timestamp|
+---+-----+---------+
| 1| 3| 12:00:01|
| 1| 4| 12:03:30|
| 1| null| 12:05:01|
| 1| null| 12:10:01|
| 1| 5| 12:12:20|
| 1| null| 12:15:01|
| 1| null| 12:20:01|
| 1| null| 12:25:01|
| 1| null| 12:30:01|
| 1| null| 12:35:01|
| 1| null| 12:40:01|
| 1| null| 12:45:01|
| 1| null| 12:50:01|
| 1| null| 12:55:01|
| 1| null| 13:00:01|
| 1| null| 13:05:01|
| 1| null| 13:10:01|
| 1| null| 13:15:01|
| 1| null| 13:20:01|
| 1| null| 13:25:01|
+---+-----+---------+
only showing top 20 rows
raw_data1 = [
{"id":1,"value":3,"timestam":"12:00:01"},
{"id":1,"value":4,"timestam":"12:03:30"},
{"id":1,"value":5,"timestam":"12:12:20"},
{"id":1,"value":3,"timestam":"15:00:00"},
]
raw_data = [*map(lambda entry : Row(**entry),raw_data1)]
initial_data = sqlContext.createDataFrame(raw_data,schema="id int, value int, timestam string ")
initial_data.createOrReplaceTempView('initial_data')
sqlContext.sql("create or replace temp view initial_data2 as select id,value,cast(timestam as timestamp) as timestamp from initial_data")
Upvotes: 2