Raman
Raman

Reputation: 19565

KafkaConsumer: consume all available messages once, and exit

I want to create a KafkaConsumer, with Kafka 2.0.0, that consumes all available messages once, and exits immediately. This differs slightly from the standard console consumer utility because that utility waits for a specified timeout for new messages, and only exits once that timeout has expired.

This seemingly simple task seems to be surprisingly hard using the KafkaConsumer. My gut reaction was the following pseudo-code:

consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
do
  result = consumer.poll(Duration.ofMillis(0))
  // onResult(result)
while result is not empty

However this does not work, as poll always returns an empty collection even though there are many messages on the topic.

Researching this, it looks like one reason may have been that assign/subscribe are considered lazy, and partitions are not assigned until a poll loop has completed (although I cannot find any support for this assertion in the docs). However, the following pseudo-code also returns an empty collection on each call to poll:

consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// returns nothing
result = consumer.poll(Duration.ofMillis(0))
// deprecated poll also returns nothing
result = consumer.poll(0)
// returns nothing
result = consumer.poll(0)
// returns nothing
result = consumer.poll(0)
...

So clearly "laziness" is not the issue.

The javadoc states:

This method returns immediately if there are records available.

which seems to imply that the first pseudo-code above should work. However, it does not.

The only thing that seems to work is to specify a non-zero timeout on poll, and not just any non-zero value, for example 1 doesn't work. Which indicates that there is some non-deterministic behavior happening inside poll which assumes that poll will always be carried out in an infinite loop and it doesn't matter that it occasionally returns an empty collection despite the availability of messages. The code seems to confirm this with various calls to check if the timeout is expired sprinkled throughout the poll implementation and its callees.

So with the naive approach, a longer timeout is obviously required (and ideally Long.MAX_VALUE to avoid the non-deterministic behavior of a shorter poll interval), but unfortunately this will cause the consumer to block on the last poll, which isn't desired in this situation. With the naive approach, we now have a trade-off between how deterministic we want the behavior to be, vs how long we have to wait for no reason on the last poll. How do we avoid this?

Upvotes: 1

Views: 4771

Answers (2)

Raman
Raman

Reputation: 19565

The only way to accomplish this seems to be with some additional logic that self-manages the offsets. Here is the pseudo-code:

consumer.assign(all partitions)
consumer.seekToBeginning(all partitions)
// record the current ending offsets and poll until we get there
endOffsets = consumer.endOffsets(all partitions)

do
  result = consumer.poll(NONTRIVIAL_TIMEOUT)
  // onResult(result)
while given any partition p, consumer.position(p) < endOffsets[p]

and an implementation in Kotlin:

val topicPartitions = consumer.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }
consumer.assign(topicPartitions)
consumer.seekToBeginning(consumer.assignment())

val endOffsets = consumer.endOffsets(consumer.assignment())
fun pendingMessages() = endOffsets.any { consumer.position(it.key) < it.value }

do {
  records = consumer.poll(Duration.ofMillis(1000))
  onResult(records)
} while(pendingMessages())

The poll duration can now be set to a reasonable value (such as 1s) without concern of missing messages, since the loop continues until the consumer reaches the end offsets identified at the beginning of the loop.

There is one other corner case this deals with correctly: if the end offsets have changed, but there are actually no messages between the current offset and the end offset, then the poll will block and timeout. So it is important that the timeout not be set too low (otherwise the consumer will timeout before retrieving messages that are available) and it must also not be set too high (otherwise the consumer will take too long to timeout when retrieving messages that are not available). The latter situation can happen if those messages were deleted, or if the topic was deleted and recreated.

Upvotes: 2

Adam Kotwasinski
Adam Kotwasinski

Reputation: 4554

If there's noone producing concurrently, you can also use endOffsets to get the position of last message, and consume until that.

So, in pseudocode:

long currentOffset = -1
long endOffset = consumer.endOffset(partition)
while (currentOffset < endOffset) {
  records = consumer.poll(NONTRIVIAL_TIMEOUT) // discussed in your answer
  currentOffset = records.offsets().max()
}

This way we avoid final non-zero hangup, as we are always sure there is something to receive.

You might need to add safeguards if your consumer's position is equal to end offset (as you'd get no messages there).

Also, you might want to set max.poll.records to 1, so you don't consume messages positioned after end offset, if someone is producing in parallel.

Upvotes: 0

Related Questions