Reputation: 157
I'm writing a Kafka Streams App. It does the following steps" 1) consumes the input data 2) dedupe the record based on a new key within 1hr window 3) reselect the key 4) count the key within 1hr window 5) send to downstream.
I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms
to be 1hr as well. Is this the right thing to do?
Once I deploy my app with real traffic, it seems like the app keeps sending messages while I thought it will only send out a bunch messages every hour?
Any help is appreciated!!
My config:
commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000
cache.max.bytes.buffering = 10485760
// dedupe by new key per window(1hr)
stream = inputStream
.selectKey(... )
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
// only keep the latest event for each customized key
.reduce((event1, event2) -> event2)
.toStream()
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
.reduce((event1, event2) -> {
long count1 = event1.getCount();
long count2 = event2.getCount();
event2.setCount(count1 + count2);
return event2;
})
.toStream()
.to(OUTPUT_TOPIC);
Upvotes: 3
Views: 1365
Reputation: 62350
I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms to be 1hr as well. Is this the right thing to do?
The commit interval has nothing to do with your processing logic.
You might want to look into suppress()
operator. Also the following block post might help:
Kafka Streams' processing model is continuous and it send continuous result updates by default. That's why you get basically one output message per input message, because processing an input message modifies the result.
Upvotes: 2
Reputation: 919
Upvotes: -1