ViviZa
ViviZa

Reputation: 31

Mix of State Stores and Partitions on kafka stream instances

I built a kafka streaming application with a state store. Now I am trying to scale this application. When running the application on three different servers Kafka splits up partitions and state stores randomly.

For example:

Instance1 gets: partition-0, partition-1

Instance2 gets: partition-2, stateStore-repartition-0

Instance3 gets: stateStore-repartition-1, stateStore-repartition-2

I want to assign one stateStore and one partition per instance. What am I doing wrong?

My KafkaStreams Config:

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);

try {
     properties.setProperty(StreamsConfig.STATE_DIR_CONFIG,
           Files.createTempDirectory(stateStoreName).toAbsolutePath().toString());
} catch (final IOException e) {
         // use the default one
}

And my stream is:

stream.groupByKey()
       .windowedBy(TimeWindows.of(timeWindowDuration))
       .<TradeStats>aggregate(
           () -> new TradeStats(),
           (k, v, tradestats) -> tradestats.add(v),
           Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as(stateStoreName)
        .withValueSerde(new TradeStatsSerde()))
        .toStream();

Upvotes: 1

Views: 448

Answers (1)

mle
mle

Reputation: 2550

From what I can see so far (as mentioned in my comment to your question, please share your state store definition), everything is fine and I suspect a slight misconception on your side regarding the question

What am I doing wrong?

Basically, nothing. :-)

For the partition part of your question: They get distributed around the consumers according to the configured assignor (consult https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html or adjacent interfaces).

For the state store part of your question: May be here lies a little misconception on how (in memory) state stores work: They are usually backed by a Kafka topic which does not reside on your application host(s) but in the Kafka cluster itself. To be more precise, a part of the whole state store lives in the (RocksDB) in-memory key/value store on each of your application hosts, exactly as you showed in the state store assignment in your question. However these are only parts or slices of the complete state store which is maintained in the Kafka cluster.

So in a nutshell: Everything is fine, let Kafka do the assignment job and interfere with this only if you have really special use-cases or good reasons. :-) Kafka also assures correct redundancy and re-balancing of all partitions in case of outages of your application hosts.

If you still want to assign something on your own, the use-case would be interesting for further help.

Upvotes: 1

Related Questions