Tobias Hermann
Tobias Hermann

Reputation: 10936

How to wait for full Kafka-message batch with Spring Boot?

When batch-consuming Kafka messages, one can limit the batch size using max.poll.records.

In case the consumer is very fast and its commit offset does not lag significantly, this means that most batches will be much smaller. I'd like to only receive "full" batches, i.e., having my consumer function only invoked then the batch size is reached. So I'm looking for something like min.poll.records, which does not exist in that form.

Here is a minimal example of what I'm doing:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @Bean
    fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
        factory.isBatchListener = true
        return factory
    }

    @KafkaListener(
        topics = ["myTopic"],
        containerFactory = "kafkaBatchListenerContainerFactory"
    )
    fun batchListen(values: List<ConsumerRecord<String, String>>) {
        println(values.count())
    }
}

When started with a bit of consumer lag, it outputs something like:

[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]

Is there any way (without manually sleep-ing in the consumer handler in case of "incomplete" batches) to have the function invoked when one of the following two conditions is met? - only when at least n messages are there - or at least m milliseconds were spend waiting

Upvotes: 4

Views: 4790

Answers (3)

PVS
PVS

Reputation: 73

As you wait for the batch to complete (accrue to 300), your offset will be committed each time you go back to the listener to fetch. Each time the listener goes back, it would commit the previous batch, though you may not have processed them as you hold them in the buffer.

If there is a failure (a listener crash, for example), then you will loose messages in the buffer. This may not be an issue for your use case, but just wanted to highlight the possibility.

Upvotes: 0

Tobias Hermann
Tobias Hermann

Reputation: 10936

Since, as pointed out nicely by Gary Russel, it's currently not possible to do make Kafka do what I was looking for, here is my solution with manual buffering, which achieves the desired behavior:

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.util.*
import javax.annotation.PreDestroy

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

@Component
class TestConsumer {
    @KafkaListener(topics = ["myTopic"])
    fun listen(value: String) {
        addToBuffer(value)
    }

    private val buffer = mutableSetOf<String>()

    @Synchronized
    fun addToBuffer(message: String) {
        buffer.add(message)
        if (buffer.size >= 300) {
            flushBuffer()
        }
    }

    @Synchronized
    @Scheduled(fixedDelay = 700)
    @PreDestroy
    fun flushBuffer() {
        if (buffer.isEmpty()) {
            return
        }
        val timestamp = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(Date())
        println("$timestamp: ${buffer.count()}")
        buffer.clear()
    }
}

Example output:

[...]
2020-01-03T07:01:13.032: 300
2020-01-03T07:01:13.041: 300
2020-01-03T07:01:13.078: 300
2020-01-03T07:01:13.133: 300
2020-01-03T07:01:13.143: 300
2020-01-03T07:01:13.188: 300
2020-01-03T07:01:13.197: 300
2020-01-03T07:01:13.321: 300
2020-01-03T07:01:13.352: 300
2020-01-03T07:01:13.359: 300
2020-01-03T07:01:13.399: 300
2020-01-03T07:01:13.407: 300
2020-01-03T07:01:13.533: 300
2020-01-03T07:01:13.571: 300
2020-01-03T07:01:13.580: 300
2020-01-03T07:01:13.607: 300
2020-01-03T07:01:13.611: 300
2020-01-03T07:01:13.632: 300
2020-01-03T07:01:13.682: 300
2020-01-03T07:01:13.687: 300
2020-01-03T07:01:13.708: 300
2020-01-03T07:01:13.712: 300
2020-01-03T07:01:13.738: 300
2020-01-03T07:01:13.880: 300
2020-01-03T07:01:13.884: 300
2020-01-03T07:01:13.911: 300
2020-01-03T07:01:14.301: 300
2020-01-03T07:01:14.714: 300
2020-01-03T07:01:15.029: 300
2020-01-03T07:01:15.459: 300
2020-01-03T07:01:15.888: 300
2020-01-03T07:01:16.359: 300
[...]

So we see after catching up with the consumer lag, it provides batches of 300 matching the topic throughput.

Yes, the @Synchronized does kill concurrent processing, but in my use-case, this part is far away from being the bottleneck.

Upvotes: 2

Gary Russell
Gary Russell

Reputation: 174514

Kafka has no min.poll.records; you can approximate it using fetch.min.bytes if your records are a similar length. Also see fetch.max.wait.ms.

Upvotes: 8

Related Questions