thinktwice
thinktwice

Reputation: 157

How to let Kafka Streams send one record per key per 1hour window?

I'm writing a Kafka Streams App. It does the following steps" 1) consumes the input data 2) dedupe the record based on a new key within 1hr window 3) reselect the key 4) count the key within 1hr window 5) send to downstream.

I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms to be 1hr as well. Is this the right thing to do?

Once I deploy my app with real traffic, it seems like the app keeps sending messages while I thought it will only send out a bunch messages every hour?

Any help is appreciated!!

My config:

commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000    
cache.max.bytes.buffering = 10485760

// dedupe by new key per window(1hr)
 stream = inputStream
        .selectKey(... )
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        // only keep the latest event for each customized key
        .reduce((event1, event2) -> event2)
        .toStream()
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        .reduce((event1, event2) -> {
            long count1 = event1.getCount();
            long count2 = event2.getCount();
            event2.setCount(count1 + count2);
            return event2;
        })
        .toStream()
        .to(OUTPUT_TOPIC);

Upvotes: 3

Views: 1365

Answers (2)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms to be 1hr as well. Is this the right thing to do?

The commit interval has nothing to do with your processing logic.

You might want to look into suppress() operator. Also the following block post might help:

Kafka Streams' processing model is continuous and it send continuous result updates by default. That's why you get basically one output message per input message, because processing an input message modifies the result.

Upvotes: 2

JR ibkr
JR ibkr

Reputation: 919

  1. I would recommend you to use exactly-once guarantee provided by the recent version of kafka. Using it you won't have to worry about de-duplicating messages. https://www.baeldung.com/kafka-exactly-once
  2. Configure producer config: specifically buffer.memory & linger.ms. (You can also check batch.size) (Check https://kafka.apache.org/documentation/#producerconfigs for more information)

Upvotes: -1

Related Questions