other15
other15

Reputation: 849

Kafka streams window batching

Coming from a Spark Streaming background - getting a grasp on Kafka streams.

I have a simple Spark Streaming application that reads from Kafka,

and returns the latest event per user in that minute

Sample events would look like {"user": 1, "timestamp": "2018-05-18T16:56:30.754Z", "count": 3}, {"user": 1, "timestamp": "2018-05-22T16:56:39.754Z", "count": 4}

I'm interested in how this would work in Kafka Streams, as it seems that there is an output for each event - when my use case is to reduce the traffic.

From my reading so far it seems that this is not straight forward, and you would have to use the processor api.

Ideally I would like to use the DSL instead of the processor API, as I am just starting to look at Kafka streams, but it seems that I would have to use the processor API's punctuate method to read from a state store every n seconds?

I'm using kafka 0.11.0

Upvotes: 1

Views: 1001

Answers (2)

yuranos
yuranos

Reputation: 9725

I haven't tried it myself yet, but Kafka Streams now supports suppress operation. Take a look here:

You can use Suppress to effectively rate-limit just one KTable output or callback. Or, especially valuable for non-retractable outputs like alerts, you can use it to get only the final results of a windowed aggregation. The clearest use case for Suppress at the moment is getting final results

Based on the article, the code can look like:

events
  .groupByKey()
  .windowedBy(
    TimeWindows.of(Duration.ofMinutes(2).withGrace(Duration.ofMinutes(2))
  )
  .count(Materialized.as("count-metric"))
  .suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
  .filter( _ < 4 )
  .toStream()
  .foreach( /* Send that email! */)

I'm using Kafka Streams 2.6.0 and I'm able to reuse the same approach to build a stream.

Upvotes: 0

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

At DSL level, Kafka Streams allows to configure KTable caches (enabled by default) that reduce downstream load. The cache is an LRU cache that is flushed regularly. Thus, while the cache reduces downstream load, it does not guarantee how many outputs per window you get. (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)

If you strictly need a single output per window, using the Processor API is the right way to go.

Upvotes: 3

Related Questions