snow_leopard
snow_leopard

Reputation: 1556

How Kafka streams handle distributed data

I have tried to go through various tutorials but am not clear on two aspects of Kafka streams. Lets take the word count example mentioned in: https://docs.confluent.io/current/streams/quickstart.html

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));

KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.  The text lines are the message
// values, i.e. we can ignore whatever data is in the message keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// We use `groupBy` to ensure the words are available as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count();

// Convert the `KTable<String, Long>` into a `KStream<String, Long>` and write to the output topic.
wordCounts.toStream().to("streams-wordcount-output", 
Produced.with(stringSerde, longSerde));

Couple of questions here:
1.) Since there are no keys in the original stream, two words can land up at two different nodes as they might fall in different partition and hence true count would be the aggregation from both of them. It does not seem to be done here ? Do different nodes serving same topic's partition coordinate here to aggregate the count ?
2.) As the new stream is generated by each operation (e.g. flatMapValues, groupBy etc) are the partitions recalculated for messages in these substreams so that they land up on different nodes ?

Will appreciate any help here!

Upvotes: 1

Views: 196

Answers (1)

miguno
miguno

Reputation: 15087

1.) Since there are no keys in the original stream, two words can land up at two different nodes as they might fall in different partition and hence true count would be the aggregation from both of them. It does not seem to be done here ?

It is done here. This is the relevant code:

// We use `groupBy` to ensure the words are available as message keys
.groupBy((key, value) -> value)

Here, "words" become the new message key, which means that words are re-partitioned so that each word is put into one partition only.

Do different nodes serving same topic's partition coordinate here to aggregate the count ?

No, they don't. A partition is processed by one node only (more precisely: by one stream task only, see below).

2.) As the new stream is generated by each operation (e.g. flatMapValues, groupBy etc) are the partitions recalculated for messages in these substreams so that they land up on different nodes ?

Not sure I understand your question, notably the "recalcuated" comment. Operations (like aggregations) are always performed per partition, and Kafka Streams maps partitions to stream tasks (slightly simplified: a partition is always processed by one and only one stream task). Stream tasks are executed by the various instances of your Kafka Streams application, which typically run on different containers/VMs/machines. If need be, data will need to be re-partitioned (see the question #1 and answer above) for an operation to produce the expected result -- perhaps that's what you mean when you say "recalculated".

I'd suggest to read Kafka's documentation, such as https://kafka.apache.org/documentation/streams/architecture#streams_architecture_tasks.

Upvotes: 1

Related Questions