MR_K
MR_K

Reputation: 159

processing strategy of message in spring kafka listener

Just wanted to make sure that whether messages are processed in correct way or not. When the message gets received at listener, it will be always processed by a new thread( defined the processor bean as prototype). is this implementation correct ? (i have Considered the listener is not thread safe, so for this reason the prototype scope of bean to process the message has been used)

(Input : TestTopic- 5 partitions - 1 consumer) or (Input : TestTopic- 5 partitions - 5 consumers)

public class EventListener {

    @Autowired
    private EventProcessor eventProcessor;

    @KafkaListener(topics = "TestTopic", containerFactory = "kafkaListenerContainerFactory",
            autoStartup = "true")
    public void onMessage(
            @Payload List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
        eventProcessor.processAndAcknowledgeBatchMessages(consumerRecords, acknowledgment);
    }

}

//event processor

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@NoArgsConstructor
@SuppressWarnings("unused")
public class EventProcessorImpl implements EventProcessor {

    @Autowired
    private KafkaProducerTemplate kafkaProducerTemplate;

    @Autowired
    private ObjectMapper localObjectMapper;

    @Autowired
    private Dao dao;

    public void processAndAcknowledgeBatchMessages(
            List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
        long start = System.currentTimeMillis();
        consumerRecords.forEach( consumerRecord -> {
            Event event = localObjectMapper.readValue(consumerRecord.value(), Event.class);
            dao.save(process(event));
        });
        acknowledgment.acknowledge();
    }
}

Upvotes: 0

Views: 574

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

No it is not correct; you should not execute on another thread; it will cause problems with committing offsets and error handling.

Also, making the EventProcessorImpl a prototype bean won't help. That just means a new instance is used each time the bean is referenced.

Since it is @Autowired it is only referenced once, during initialization. To get a new instance for each request, you would need to call getBean() on the application context each time.

It is better to make your code thread-safe.

EDIT

There are (at least) a couple of ways to deal with a not thread-safe service defined in prototype scope.

  1. Use a ThreadLocal:
@SpringBootApplication
public class So68447863Application {

    public static void main(String[] args) {
        SpringApplication.run(So68447863Application.class, args);
    }

    private static final ThreadLocal<NotThreadSafeService> SERVICES = new ThreadLocal<>();

    @Autowired
    ApplicationContext context;

    @KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
    void listen(String in) {
        NotThreadSafeService service = SERVICES.get();
        if (service == null) {
            service = this.context.getBean(NotThreadSafeService.class);
            SERVICES.set(service);
        }
        service.process(in);
    }

    @EventListener
    void removeService(ConsumerStoppedEvent event) {
        System.out.println("Consumer stopped; removing TL");
        SERVICES.remove();
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    NotThreadSafeService service() {
        return new NotThreadSafeService();
    }

}

class NotThreadSafeService {

    void process(String msg) {
        System.out.println(msg + " processed by " + this);
    }

}
  1. Use a pool of instances.
@SpringBootApplication
public class So68447863Application {

    public static void main(String[] args) {
        SpringApplication.run(So68447863Application.class, args);
    }

    private static final BlockingQueue<NotThreadSafeService> SERVICES = new LinkedBlockingQueue<>();

    @Autowired
    ApplicationContext context;

    @KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
    void listen(String in) {
        NotThreadSafeService service = SERVICES.poll();
        if (service == null) {
            service = this.context.getBean(NotThreadSafeService.class);
        }
        try {
            service.process(in);
        }
        finally {
            SERVICES.add(service);
        }
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    NotThreadSafeService service() {
        return new NotThreadSafeService();
    }

}

class NotThreadSafeService {

    void process(String msg) {
        System.out.println(msg + " processed by " + this);
    }

}

Upvotes: 1

Related Questions