Reputation: 7084
I have kafka handler in spring boot:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
For example producer send one message every second. But myService.processResponse
work 10 seconds. I need handle each message and start myService.processResponse
in new thread. I can create my executor and delegate each response to it. But I think there are another configs in kafka for them. I found 2:
1) add concurrency = "5"
to @KafkaListener
annotation - It seems to be working. But I'm not sure how correct, because I have second way:
2) I can create ConcurrentKafkaListenerContainerFactory
and set to it ConsumerFactory
and concurrency
I do not understand the difference between these methods? is it enough just to add concurrency = "5"
to @KafkaListener
annotation or I need create ConcurrentKafkaListenerContainerFactory
?
Or I do not understand anything at all and is there another way?
Upvotes: 2
Views: 13475
Reputation: 163
Kinda old question I found here, but I don't think I see an actual answer about processing messages on different threads.
If you don't mind committing your messages with no regard to processing, you can take advantage of @Async
on your processing logic (and make sure your project has @EnableAsync
on your application).
Read the messages in your listener:
@KafkaListener(topics = "my_topic")
void listen(ConsumerRecord<String, String> record) {
messageHandlingService.processMessage(record.value());
}
And then in a separate service, annotate your async processing message:
@Async
public void processMessage(String messageString) {
// do stuff here
}
Now you can read multiple messages without being blocked by processing them. With all things, there are challenges and caveats and possible configurations, but this might help get you started.
Upvotes: 3
Reputation: 9675
concurrency
option has nothing to do with concurrently processing the messages received by the same consumer. It's for consumer groups when you have multiple consumers that each processes their own partitions.
Passing the processing to a separate thread is very complicated and Spring-Kafka team decided not to do that "by design", I believe. You don't even need to dig into Spring-Kafka to understand why. Check the KafkaConsumer's Detecting Consumer Failures doc:
Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.
Upvotes: 6
Reputation: 174484
Using an executor makes things complicated, with regard to managing committed offsets; it is not recommended.
With @KafkaListener
, the framework creates a ConcurrentKafkaListenerContainerFactory
for you.
concurrency
on the annotation is just a convenience; it overrides the factory setting.
This allows you to use the same factory with multiple listeners, each with different concurrency.
You can set the container concurrency (default) using a boot property; that value is overridden by the annotation value; see the javadocs...
/**
* Override the container factory's {@code concurrency} setting for this listener. May
* be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
* which case {@link Number#intValue()} is used to obtain the value.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the concurrency.
* @since 2.2
*/
String concurrency() default "";
Upvotes: 7