djotanov
djotanov

Reputation: 3

Spring cloud stream - Autowiring underlying Consumer for a given PollableMessageSource

Is it possible to get a hold of underlying KafkaConsumer bean for a defined PollableMessageSource?

I have Binding defined as:

public interface TestBindings {

    String TEST_SOURCE =  "test";

    @Input(TEST_SOURCE)
    PollableMessageSource testTopic();
}

and config class:

@EnableBinding(TestBindings.class)
public class TestBindingsPoller {
    @Bean
    public ApplicationRunner testPoller(PollableMessageSource testTopic) {
        // Get kafka consumer for PollableMessageSource
        KafkaConsumer kafkaConsumer = getConsumer(testTopic);
        return args -> {
            while (true) {
                if (!testTopic.poll(...) {
                    Thread.sleep(500);
                }
            }
        };
    }
}

The question is, how can I get KafkaConsumer that corresponds to testTopic? Is there any way to get it from beans that are wired in spring cloud stream?

Upvotes: 0

Views: 200

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

The KafkaMessageSource populates a KafkaConsumer into headers, so it is available in the place you receive messages: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java#L57.

If you are going to do stuff like poll yourself, I would suggest to inject a ConsumerFactory and use a consumer from there already.

Upvotes: 1

Related Questions