Ajay C
Ajay C

Reputation: 66

Aggregating data in Upsolver and using Athena output to Upsert in Athena

I'm getting Kafka stream which I need to aggregate and load into Athena. As each event comes, the aggregates should update to reflect the new event. I want to re-use this aggregated data for multiple outputs so I used an Upsolver intermediate output to first construct the aggregated data. And then creating multiple Athena and Redshift outputs to Upsert from this aggregated intermediate output. Since ingestion happens per minute, issue is that each time a new event arrives, it overrides the aggregates only with data from that minute as opposed to total aggregates from all data processed so far?

Upvotes: 0

Views: 29

Answers (1)

Ajay C
Ajay C

Reputation: 66

Intermediate Upsolver output will by default only process data as it was ingested, so each 1 minute stream of events will get aggregated. Since you want to aggregate all data so far, you need to add a WINDOW clause.

SELECT id, col1, count(col2)
FROM table
GROUP BY id, col1
WINDOW 31 DAYS

You can add any window you need. In this case, the intermediate output will maintain aggregates for 31 past days.

Example: With window=31 days, lets say these are the event streams you received. date is the attribute in your event which shows when this event actually happened, time is the Upsolver attribute which shows when this event was ingested into Upsolver. And you are doing count of id.

    id=1 date=11/30/2022 time =11/30/2022       count=1
    id=1 date=12/15/2022 time =12/15/2022       count=2
    id=1 date=12/20/2022 time =12/20/2022       count=3
    id=1 date=12/31/2022 time =12/31/2022       count=4

So each time since your event is within 31 days, the count kept updating. Lets say a very late event arrives, impossible situation in your business case but still happens. Even though no late event was supposed to arrive past 31 days, one event arrives really late.

    id=1 date=12/10/2022 time =2/1/2023  

this is a really really late event not accounted for and may be it won't happen but if it does happen, it will now re-aggregate upto 31 days from 2/1/2023 and result in a new count of just 1 as this is the only event in this defined window. You can add below where to make it fail safe.

SELECT id, col1, count(col2)
FROM table
WHERE DATE_DIFF_PRECISE('day', TO_DATE("date"), TO_DATE(time)) <= 31
GROUP BY id, col1
WINDOW 31 DAYS
//Here 31 was your window, change accordingly

Upvotes: 1

Related Questions