x97Core
x97Core

Reputation: 1514

Rolling up events to a minute in Apache Beam

So, I have a stream of data with this structure (I apologize it's in SQL)

CREATE TABLE github_events
(
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    user_id bigint,
    org jsonb,
    created_at timestamp 
);

In SQL, I would rollup this data up to a minute like this:

1.Create a roll-up table for this purpose:

CREATE TABLE github_events_rollup_minute
(
    created_at timestamp,
    event_count bigint
);

2.And populate with INSERT/SELECT:

INSERT INTO github_events_rollup_minute(
    created_at,
    event_count
)
SELECT
    date_trunc('minute', created_at) AS created_at,
    COUNT(*)the  AS event_count
FROM github_events
GROUP BY 1;

In Apache Beam, I am trying to roll-up events to a minute, i.e. count the total number of events received in that minute as per event's timestamp field.

Timestamp(in YYYY-MM-DDThh:mm): event_count

So, later in the pipeline if we receive more events with the same overlapping timestamp (due to the event receiving delays as the customer might be offline), we just need to take the roll-up count and increment the count for that timestamp.

This will allow us to simply increment the count for YYYY-MM-DDThh:mm by event_count in the application.

Assuming, events might be delayed but they'll always have the timestamp field.

I would like to accomplish the same thing in Apache Beam. I am very new to Apache Beam, I feel that I am missing something in Beam that would allow me to accomplish this. I've read the Apache Beam Programming Guide multiple times.

Upvotes: 1

Views: 493

Answers (1)

Scott Wegner
Scott Wegner

Reputation: 7493

Take a look at the sections on Windowing and Triggers. What you're describing is fixed-time windows with allowed late data. The general shape of the pipeline sounds like:

  1. Read input github_events data
  2. Window into fixed windows of 1 minute, allowing late data
  3. Count events per-window
  4. Output the result to github_events_rollup_minute

The WindowedWordCount example project demonstrates this pattern.

Upvotes: 1

Related Questions