Prashant Bhardwaj
Prashant Bhardwaj

Reputation: 176

Why Kafka streams creates topics for aggregation and joins

I recently created my first Kafka stream application for learning. I used spring-cloud-stream-kafka-binding. This is a simple eCommerce system, in which I am reading a topic called products, which have all the product entries whenever a new stock of a product comes in. I am aggregating the quantity to get the total quantity of a product.

I had two choices -

  1. Send the aggregate details (KTable) to another kafka topic called aggregated-products
  2. Materialize the aggregated data

I opted second option and what I found out that application created a kafka topic by itself and when I consumed messages from that topic then got the aggregated messages.

.peek((k,v) -> LOGGER.info("Received product with key [{}] and value [{}]",k, v))
            .groupByKey()
            .aggregate(Product::new,
                    (key, value, aggregate) -> aggregate.process(value),
                    Materialized.<String, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_AGGREGATE_STATE_STORE).withValueSerde(productEventSerde)//.withKeySerde(keySerde)
                    // because keySerde is configured in application.properties
            );

Using InteractiveQueryService, I am able to access this state store in my application to find out the total quantity available for a product.

Now have few questions -

  1. why application created a new kafka topic?
  2. if answer is 'to store aggregated data' then how is this different from option 1 in which I could have sent the aggregated data by my self?
  3. Where does RocksDB come into picture?

Code of my application (which does more than what I explained here) can be accessed from this link -

https://github.com/prashantbhardwaj/kafka-stream-example/blob/master/src/main/java/com/appcloid/kafka/stream/example/config/SpringStreamBinderTopologyBuilderConfig.java

Upvotes: 0

Views: 891

Answers (1)

Lucas Brutschy
Lucas Brutschy

Reputation: 730

The internal topics are called changelog topics and are used for fault-tolerance. The state of the aggregation is stored both locally on the disk using RocksDB and on the Kafka broker in the form of a changelog topic - which is essentially a "backup". If a task is moved to a new machine or the local state is lost for a different reason, the local state can be restored by Kafka Streams by reading all changes to the original state from the changelog topic and applying it to a new RocksDB instance. After restoration has finished (the whole changelog topic was processed), the same state should be on the new machine, and the new machine can continue processing where the old one stopped. There are a lot of intricate details to this (e.g. in the default setting, it can happen that the state is updated twice for the same input record when failures happen).

See also https://developer.confluent.io/learn-kafka/kafka-streams/stateful-fault-tolerance/

Upvotes: 2

Related Questions