Reputation: 10936
When batch-consuming Kafka messages, one can limit the batch size using max.poll.records
.
In case the consumer is very fast and its commit offset does not lag significantly, this means that most batches will be much smaller. I'd like to only receive "full" batches, i.e., having my consumer function only invoked then the batch size is reached. So I'm looking for something like min.poll.records
, which does not exist in that form.
Here is a minimal example of what I'm doing:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.stereotype.Component
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@Bean
fun kafkaBatchListenerContainerFactory(kafkaProperties: KafkaProperties): ConcurrentKafkaListenerContainerFactory<String, String> {
val configs = kafkaProperties.buildConsumerProperties()
configs[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1000
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = DefaultKafkaConsumerFactory(configs)
factory.isBatchListener = true
return factory
}
@KafkaListener(
topics = ["myTopic"],
containerFactory = "kafkaBatchListenerContainerFactory"
)
fun batchListen(values: List<ConsumerRecord<String, String>>) {
println(values.count())
}
}
When started with a bit of consumer lag, it outputs something like:
[...]
1000
1000
1000
[...]
1000
1000
1000
256
27
8
9
3
1
1
23
[...]
Is there any way (without manually sleep
-ing in the consumer handler in case of "incomplete" batches) to have the function invoked when one of the following two conditions is met?
- only when at least n
messages are there
- or at least m
milliseconds were spend waiting
Upvotes: 4
Views: 4790
Reputation: 73
As you wait for the batch to complete (accrue to 300), your offset will be committed each time you go back to the listener to fetch. Each time the listener goes back, it would commit the previous batch, though you may not have processed them as you hold them in the buffer.
If there is a failure (a listener crash, for example), then you will loose messages in the buffer. This may not be an issue for your use case, but just wanted to highlight the possibility.
Upvotes: 0
Reputation: 10936
Since, as pointed out nicely by Gary Russel, it's currently not possible to do make Kafka do what I was looking for, here is my solution with manual buffering, which achieves the desired behavior:
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.text.SimpleDateFormat
import java.util.*
import javax.annotation.PreDestroy
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
@Component
class TestConsumer {
@KafkaListener(topics = ["myTopic"])
fun listen(value: String) {
addToBuffer(value)
}
private val buffer = mutableSetOf<String>()
@Synchronized
fun addToBuffer(message: String) {
buffer.add(message)
if (buffer.size >= 300) {
flushBuffer()
}
}
@Synchronized
@Scheduled(fixedDelay = 700)
@PreDestroy
fun flushBuffer() {
if (buffer.isEmpty()) {
return
}
val timestamp = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(Date())
println("$timestamp: ${buffer.count()}")
buffer.clear()
}
}
Example output:
[...]
2020-01-03T07:01:13.032: 300
2020-01-03T07:01:13.041: 300
2020-01-03T07:01:13.078: 300
2020-01-03T07:01:13.133: 300
2020-01-03T07:01:13.143: 300
2020-01-03T07:01:13.188: 300
2020-01-03T07:01:13.197: 300
2020-01-03T07:01:13.321: 300
2020-01-03T07:01:13.352: 300
2020-01-03T07:01:13.359: 300
2020-01-03T07:01:13.399: 300
2020-01-03T07:01:13.407: 300
2020-01-03T07:01:13.533: 300
2020-01-03T07:01:13.571: 300
2020-01-03T07:01:13.580: 300
2020-01-03T07:01:13.607: 300
2020-01-03T07:01:13.611: 300
2020-01-03T07:01:13.632: 300
2020-01-03T07:01:13.682: 300
2020-01-03T07:01:13.687: 300
2020-01-03T07:01:13.708: 300
2020-01-03T07:01:13.712: 300
2020-01-03T07:01:13.738: 300
2020-01-03T07:01:13.880: 300
2020-01-03T07:01:13.884: 300
2020-01-03T07:01:13.911: 300
2020-01-03T07:01:14.301: 300
2020-01-03T07:01:14.714: 300
2020-01-03T07:01:15.029: 300
2020-01-03T07:01:15.459: 300
2020-01-03T07:01:15.888: 300
2020-01-03T07:01:16.359: 300
[...]
So we see after catching up with the consumer lag, it provides batches of 300
matching the topic throughput.
Yes, the @Synchronized
does kill concurrent processing, but in my use-case, this part is far away from being the bottleneck.
Upvotes: 2
Reputation: 174514
Kafka has no min.poll.records
; you can approximate it using fetch.min.bytes
if your records are a similar length. Also see fetch.max.wait.ms
.
Upvotes: 8