habarnam
habarnam

Reputation: 86

insert extra rows in query result sql

Given a table with entries at irregular time stamps, "breaks" must be inserted at regular 5 min intervals ( the data associated can / will be NULL ).

I was thinking of getting the start time, making a subquery that has a window function and adds 5 min intervals to the start time - but I only could think of using row_number to increment the values.

WITH data as(
select id, data,
cast(date_and_time as double) * 1000 as time_milliseconds
from t1), -- original data

start_times as(
select id, MIN(CAST(date_and_time as double) * 1000) as start_time
from t1
GROUP BY id
), -- first timestamp for each id

boundries as (
SELECT T1.id,(row_number() OVER (PARTITION BY T1.id ORDER BY T1.date_and_time)-1) *300000 + start_times.start_time
as boundry
from T1
INNER JOIN start_times ON start_times.id= T1.id
) -- increment the number of 5 min added on each row and later full join boundries table with original data

However this limits me to the number of rows present for an id in the original data table, and if the timestamps are spread out, the number of rows cannot cover the amount of 5 min intervals needed to be added.

sample data:

initial data:

 |-----------|------------------|------------------|
 |   id      |     value        |    timestamp     |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01.011  | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30.041  |
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20.231  |
 |-----------|------------------|------------------|
 |     1     |    3             |    15:00:00.312  |

data after my query:

 |-----------|------------------|------------------|
 |   id      |     value        | timestamp (UNIX) |
 |-----------|------------------|------------------|
 |     1     |    3             |    12:00:01      | 
 |-----------|------------------|------------------|
 |     1     |    4             |    12:03:30      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:05:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:10:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    5             |    12:12:20      |
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:15:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|
 |     1     |    NULL          |    12:20:01      |  <-- Data from "boundries"
 |-----------|------------------|------------------|  <-- Jumping directly to 15:00:00 (WRONG! :( need to insert more 5 min breaks here )
 |     1     |    3             |    15:00:00      |  



I was thinking of creating a temporary table inside HIVE and filling it with x rows representing 5 min intervals from the starttime to the endtime of the data table, but I couldn't find any way of accomplishing that.

Any way of using "for loops" ? Any suggestions would be appreciated.

Thanks

Upvotes: 2

Views: 207

Answers (2)

leftjoin
leftjoin

Reputation: 38325

You can try calculating the difference between current timestamp and next one, divide 300 to get number of ranges, produce a string of spaces with length = num_ranges, explode to generate rows.

Demo:

with your_table as (--initial data example
select stack (3,
1,3 ,'2020-01-01 12:00:01.011', 
1,4 ,'2020-01-01 12:03:30.041',
1,5 ,'2020-01-01 12:20:20.231' 
) as (id ,value ,ts )
)


select id ,value, ts, next_ts,
        diff_sec,num_intervals,
       from_unixtime(unix_timestamp(ts)+h.i*300) new_ts, coalesce(from_unixtime(unix_timestamp(ts)+h.i*300),ts) as calculated_timestamp
from
(
 select id ,value ,ts, next_ts, (unix_timestamp(next_ts)-unix_timestamp(ts))  diff_sec,  
 floor((unix_timestamp(next_ts)-unix_timestamp(ts))/300 --diff in seconds/5 min
                                         ) num_intervals
from
(  
select id ,value ,ts, lead(ts) over(order by ts) next_ts
  from your_table
) s
)s
  lateral view outer posexplode(split(space(cast(s.num_intervals as int)),' ')) h as i,x --this will generate rows

Result:

id  value   ts                      next_ts                 diff_sec    num_intervals   new_ts              calculated_timestamp
1   3       2020-01-01 12:00:01.011 2020-01-01 12:03:30.041 209          0              2020-01-01 12:00:01 2020-01-01 12:00:01
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:03:30 2020-01-01 12:03:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:08:30 2020-01-01 12:08:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:13:30 2020-01-01 12:13:30
1   4       2020-01-01 12:03:30.041 2020-01-01 12:20:20.231 1010         3              2020-01-01 12:18:30 2020-01-01 12:18:30
1   5       2020-01-01 12:20:20.231 \N                      \N           \N             \N                  2020-01-01 12:20:20.231

Additional rows were added. I left all intermediate columns for debugging purposes.

Upvotes: 2

ggordon
ggordon

Reputation: 10035

A recursive query could be helpful here but hive does not support these more info.

You may consider creating the table outside of hive or writing a UDF.

Either way this query can be expensive and the use of materialized views/tables are recommended depending on your frequency.

The example shows a UDF inbetween created using pyspark to run the query. It

  1. generate the values in between the min and max timestamp from the dataset
  2. using CTEs and the UDF to create a temporary table intervals
  3. generating all possible intervals using an expensive cross join in possible_records
  4. Using a left join to retrieve the records with actual values (for demonstration purposes i've represented the timestamp value as just the time string)

The code below shows how it was evaluated using hive

Example Code


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,ArrayType

inbetween = lambda min_value,max_value :  [*range(min_value,max_value,5*60)]

udf_inbetween = udf(inbetween,ArrayType(IntegerType()))
sqlContext.udf.register("inbetween",udf_inbetween)

sqlContext.sql("""
WITH max_timestamp(t) as (
  select max(timestamp) as t from initial_data2
),
min_timestamp(t) as (
  select min(timestamp) as t from initial_data2
),
intervals as (
   select explode(inbetween(unix_timestamp(mint.t),unix_timestamp(maxt.t))) as interval_time FROM
   min_timestamp mint, max_timestamp maxt
),
unique_ids as (
  select distinct id from initial_data2
),
interval_times as (
   select interval_time from (
   select 
       cast(from_unixtime(interval_time) as timestamp) as interval_time 
   from 
       intervals
   UNION
   select distinct d.timestamp as interval_time from initial_data2 d
   )
   order by interval_time asc
),
possible_records as (
   select
      distinct 
      d.id,
      i.interval_time
   FROM
      interval_times i, unique_ids d
   
)
select 
    p.id,
    d.value,
    split(cast(p.interval_time as string)," ")[1] as timestamp
FROM
  possible_records p
LEFT JOIN
   initial_data2 d ON d.id = p.id and d.timestamp = p.interval_time

ORDER BY p.id, p.interval_time
""").show(20)


Output

+---+-----+---------+
| id|value|timestamp|
+---+-----+---------+
|  1|    3| 12:00:01|
|  1|    4| 12:03:30|
|  1| null| 12:05:01|
|  1| null| 12:10:01|
|  1|    5| 12:12:20|
|  1| null| 12:15:01|
|  1| null| 12:20:01|
|  1| null| 12:25:01|
|  1| null| 12:30:01|
|  1| null| 12:35:01|
|  1| null| 12:40:01|
|  1| null| 12:45:01|
|  1| null| 12:50:01|
|  1| null| 12:55:01|
|  1| null| 13:00:01|
|  1| null| 13:05:01|
|  1| null| 13:10:01|
|  1| null| 13:15:01|
|  1| null| 13:20:01|
|  1| null| 13:25:01|
+---+-----+---------+

only showing top 20 rows

Data Prep to replicate


raw_data1 = [
    {"id":1,"value":3,"timestam":"12:00:01"},
    {"id":1,"value":4,"timestam":"12:03:30"},
    {"id":1,"value":5,"timestam":"12:12:20"},
    {"id":1,"value":3,"timestam":"15:00:00"},
]
raw_data = [*map(lambda entry : Row(**entry),raw_data1)]

initial_data = sqlContext.createDataFrame(raw_data,schema="id int, value int, timestam string ")
initial_data.createOrReplaceTempView('initial_data')

sqlContext.sql("create or replace temp view initial_data2 as select id,value,cast(timestam as timestamp) as timestamp from initial_data")

Upvotes: 2

Related Questions