Kannan Ekanath
Kannan Ekanath

Reputation: 17631

Consumer with multiple partitions are not interleaved

I am trying to run the simple example as shown in https://projectreactor.io/docs/kafka/release/reference/#_sample_consumer. I see the output that is described in the link however I am confused if this is the expected output. Specifically the link says

The 20 messages published by the Producer sample should appear on the console. As shown in the output above, messages are consumed in order for each partition, but messages from different partitions may be interleaved.

The output in the link is what I seem to be getting too. However everything in partition 1 is consumed first followed by partition 0. What I actually expected was one message from partition 0, a couple from partition 1 then a couple or so from partition 0 and so on (although inside the partition the messages are as expected ordered).

When I run locally I get same output too. Is this something I am missing?

Upvotes: 0

Views: 185

Answers (2)

mjuarez
mjuarez

Reputation: 16844

What you're seeing is expected behavior for a very small amount of messages. The consumer will interleave when consuming from multiple partitions, but only with a large quantity of messages.

What happens is that Kafka consumers work in "batches". They poll every so often, and if the 10 messages or so in one partition are small enough to fit in one poll request or "batch", then the consumer will simply consume them all at the same time, before even getting to the next partition. That's why you're not seeing this interleaving effect with 20 messages.

If you retry your test with 20K messages, you should see the interleaving behavior much more clearly.

Upvotes: 2

mazaneicha
mazaneicha

Reputation: 9425

+1 to @mjuarez 's answer. Just wanted to add that you may also be able to reproduce interleaving messages if you reduce the max.poll.records for your consumer to 1 (the default is 500) thus forcing it to process one message at a time.

From Kafka Reference:

NAME: max.poll.records
DESCRIPTION: The maximum number of records returned in a single call to poll().
TYPE: int
DEFAULT: 500
VALID VALUES: [1,...]
IMPORTANCE: medium

Upvotes: 2

Related Questions