Reputation: 253
I am working on a spring boot application. Below is my application details:
config:
spring:
cloud:
stream:
function:
bindings:
orderEventsConsumer-in-0: order-events-in
bindings:
order-events-in:
binder: bindername
destination: topic_name
group: consumer_group
kafka:
binder:
consumerProperties:
interceptor:
classes: package.to.MyInterceptor
configuration:
//again can give intercptor.classes and spring bean , but not sure if it works.
My consumerInterceptor:
public MyInterceptor implementor ConsumerInterceptor<Flux<Message>, Mono<Void>> {
private TestService testService; //Spring managed Bean
@Override
public ConsumerRecords onConsume(ConsumerRecords<Flux<Message>, Mono<Void>> consumerRecords) {
//OnCionsume
}
@Override
public void configure(Map<String, ?> config) {
**//configure to get a spring managed bean(TestService) and run a method in that bean.**
}
}
I am not able to get this bean in consumer interceptor. I tried to add below configuration:
@Bean
ConsumerConfigCustomizer customize(TestService testService) {
return (consumerProperties, bindingname,destination) -> {
consumerProperties.put("test.service.class", testService);
};
}
But it didnt worked, still testService is null in consumer Interceptor.
Any suggestions are helpful here.Thanks
Upvotes: 0
Views: 144