Manjunath H S
Manjunath H S

Reputation: 23

Kafka stream DSL: Application lags for windowed aggregation

We have the following use case:

Read from a topic(Expected Throughput is a record every 2 seconds for a key), groupByKey and do a windowed aggregation for 30 minutes window with 1 minute hopping period. The aggregation is simply appending of records received.

when the application starts everything works fine but in the later stages when the size of the aggregate increases the application slows and lags

Toplogy:

KStream<String, Foo> numericStream = builder.stream("topic", Consumed.with(Serdes.String(), FooSerde));
static Duration WINDOW_MS = Duration.ofMinutes(30);
static Duration ADVANCE_MS = Duration.ofMinutes(15);


KStream<Windowed<String>, Foo1> windowedStream = numericStream.peek((key, value) -> System.out.println(value.getDateTime()))
        .groupByKey()
        .windowedBy((TimeWindows.of(WINDOW_MS).advanceBy(ADVANCE_MS)).grace(Duration.ofMillis(30)))
        .aggregate(new Initializer<Foo1>() {
            @Override
            public  Foo1 apply() {
                return new Foo1();
            }},
                   (key, value, aggregate) -> {
                       aggregate.append(value);
                       return aggregate;
                   },
                   Materialized.<String, Foo1, WindowStore<Bytes,byte[]>>as("some_name").withValueSerde(Foo1Serde))
        .toStream()
        .peek((key, value) -> System.out.println(" Key: "+key+ " Start: "+getISTTime(key.window().start()) + " End: "+ getISTTime(key.window().end()) +" Count: " + value.getCount() ));

size of each record is around 20KB. The processing time for record goes beyond 2 secs when the size of aggregate goes above around 10MB and hence the lag.

The COMMIT_INTERVAL_MS_CONFIG is set to 0 as the state store should always be upto date with the latest packet and state store is queried and different intervals.

  1. How can we remove the lag of the application, is it something related to RocksDB I/O operation? as count operation instead of the aggregate has no lag whatsoever

  2. There are 3 partitions per topic however records with same key goes to same partition, so will threading/multiple instances help?

  3. We are also thinking of doing this without Windowing, does windowing create such kind of lags for larger aggregates?

Upvotes: 2

Views: 1220

Answers (1)

Bruno Cadonna
Bruno Cadonna

Reputation: 1418

  1. Since you write and read increasingly large data to and from RocksDB, it may slow down processing.

  2. Yes, using three threads in one instance or starting three instances with one thread each might also help in this case. With your topology and three partitions, processing is distributed over three tasks. If you have only one instance with one thread, all three tasks will be run by the same thread. You can scale up by specifying one instance with three threads or you can scale out by starting three instances with one thread each on different computing nodes. Settings in between like two instances, one with two threads and the other with one thread would also work.

  3. Without windowing, the aggregates would never expire and never be removed from the state stores. Thus, data in your state stores would grow indefinitely and probably slow down the state store.

If you use Interactive Queries to query your state store, you do not need to set COMMIT_INTERVAL_MS_CONFIG to 0, since Interactive Queries also query the cache in front of the state store. Actually, setting COMMIT_INTERVAL_MS_CONFIG to zero may also slow down your processing because it increases the disk I/O since you continuously write data to disk.

Upvotes: 3

Related Questions