ging
ging

Reputation: 253

Spring cloud stream kafka binder consumer interceptor

I am working on a spring boot application. Below is my application details:

config:

spring:
  cloud:
    stream:
      function:
        bindings:
          orderEventsConsumer-in-0: order-events-in
    bindings:
      order-events-in:
        binder: bindername
        destination: topic_name
        group: consumer_group
    kafka:
      binder: 
        consumerProperties:
           interceptor:
             classes: package.to.MyInterceptor 
        configuration:
            //again can give intercptor.classes and spring bean , but not sure if it works.

My consumerInterceptor:

public MyInterceptor implementor ConsumerInterceptor<Flux<Message>, Mono<Void>> {

    private TestService testService;   //Spring managed Bean     

     @Override
     public ConsumerRecords onConsume(ConsumerRecords<Flux<Message>, Mono<Void>> consumerRecords) {
       //OnCionsume
     }
    
     @Override
     public void configure(Map<String, ?> config) {
         **//configure to get a spring managed bean(TestService) and run a method in that bean.**
          
     }

}

I am not able to get this bean in consumer interceptor. I tried to add below configuration:

@Bean
ConsumerConfigCustomizer customize(TestService testService) {
     return (consumerProperties, bindingname,destination) -> {
         consumerProperties.put("test.service.class", testService);
  }; 
}

But it didnt worked, still testService is null in consumer Interceptor.

Any suggestions are helpful here.Thanks

Upvotes: 0

Views: 144

Answers (0)

Related Questions