ip696
ip696

Reputation: 7084

How can I process @KafkaListener method in different threads?

I have kafka handler in spring boot:

    @KafkaListener(topics = "topic-one", groupId = "response")
    public void listen(String response) {
        myService.processResponse(response);
    }

For example producer send one message every second. But myService.processResponse work 10 seconds. I need handle each message and start myService.processResponse in new thread. I can create my executor and delegate each response to it. But I think there are another configs in kafka for them. I found 2:

1) add concurrency = "5" to @KafkaListener annotation - It seems to be working. But I'm not sure how correct, because I have second way:

2) I can create ConcurrentKafkaListenerContainerFactory and set to it ConsumerFactory and concurrency

I do not understand the difference between these methods? is it enough just to add concurrency = "5" to @KafkaListener annotation or I need create ConcurrentKafkaListenerContainerFactory?

Or I do not understand anything at all and is there another way?

Upvotes: 2

Views: 13475

Answers (3)

entimaniac
entimaniac

Reputation: 163

Kinda old question I found here, but I don't think I see an actual answer about processing messages on different threads.

If you don't mind committing your messages with no regard to processing, you can take advantage of @Async on your processing logic (and make sure your project has @EnableAsync on your application).

Read the messages in your listener:

  @KafkaListener(topics = "my_topic")
  void listen(ConsumerRecord<String, String> record) {
    messageHandlingService.processMessage(record.value());
  }

And then in a separate service, annotate your async processing message:

 @Async
  public void processMessage(String messageString) {
    // do stuff here
  }

Now you can read multiple messages without being blocked by processing them. With all things, there are challenges and caveats and possible configurations, but this might help get you started.

Upvotes: 3

yuranos
yuranos

Reputation: 9675

concurrency option has nothing to do with concurrently processing the messages received by the same consumer. It's for consumer groups when you have multiple consumers that each processes their own partitions.

Passing the processing to a separate thread is very complicated and Spring-Kafka team decided not to do that "by design", I believe. You don't even need to dig into Spring-Kafka to understand why. Check the KafkaConsumer's Detecting Consumer Failures doc:

Some care must be taken to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic commits and manually commit processed offsets for records only after the thread has finished handling them (depending on the delivery semantics you need). Note also that you will need to pause the partition so that no new records are received from poll until after thread has finished handling those previously returned.

Upvotes: 6

Gary Russell
Gary Russell

Reputation: 174484

Using an executor makes things complicated, with regard to managing committed offsets; it is not recommended.

With @KafkaListener, the framework creates a ConcurrentKafkaListenerContainerFactory for you.

concurrency on the annotation is just a convenience; it overrides the factory setting.

This allows you to use the same factory with multiple listeners, each with different concurrency.

You can set the container concurrency (default) using a boot property; that value is overridden by the annotation value; see the javadocs...

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";

Upvotes: 7

Related Questions