Reputation: 3
Is it possible to get a hold of underlying KafkaConsumer bean for a defined PollableMessageSource?
I have Binding defined as:
public interface TestBindings {
String TEST_SOURCE = "test";
@Input(TEST_SOURCE)
PollableMessageSource testTopic();
}
and config class:
@EnableBinding(TestBindings.class)
public class TestBindingsPoller {
@Bean
public ApplicationRunner testPoller(PollableMessageSource testTopic) {
// Get kafka consumer for PollableMessageSource
KafkaConsumer kafkaConsumer = getConsumer(testTopic);
return args -> {
while (true) {
if (!testTopic.poll(...) {
Thread.sleep(500);
}
}
};
}
}
The question is, how can I get KafkaConsumer that corresponds to testTopic? Is there any way to get it from beans that are wired in spring cloud stream?
Upvotes: 0
Views: 200
Reputation: 121272
The KafkaMessageSource
populates a KafkaConsumer
into headers, so it is available in the place you receive messages: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java#L57.
If you are going to do stuff like poll
yourself, I would suggest to inject a ConsumerFactory
and use a consumer from there already.
Upvotes: 1