Reputation: 159
Just wanted to make sure that whether messages are processed in correct way or not. When the message gets received at listener, it will be always processed by a new thread( defined the processor bean as prototype). is this implementation correct ? (i have Considered the listener is not thread safe, so for this reason the prototype scope of bean to process the message has been used)
(Input : TestTopic- 5 partitions - 1 consumer) or (Input : TestTopic- 5 partitions - 5 consumers)
public class EventListener {
@Autowired
private EventProcessor eventProcessor;
@KafkaListener(topics = "TestTopic", containerFactory = "kafkaListenerContainerFactory",
autoStartup = "true")
public void onMessage(
@Payload List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
eventProcessor.processAndAcknowledgeBatchMessages(consumerRecords, acknowledgment);
}
}
//event processor
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@NoArgsConstructor
@SuppressWarnings("unused")
public class EventProcessorImpl implements EventProcessor {
@Autowired
private KafkaProducerTemplate kafkaProducerTemplate;
@Autowired
private ObjectMapper localObjectMapper;
@Autowired
private Dao dao;
public void processAndAcknowledgeBatchMessages(
List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
long start = System.currentTimeMillis();
consumerRecords.forEach( consumerRecord -> {
Event event = localObjectMapper.readValue(consumerRecord.value(), Event.class);
dao.save(process(event));
});
acknowledgment.acknowledge();
}
}
Upvotes: 0
Views: 574
Reputation: 174799
No it is not correct; you should not execute on another thread; it will cause problems with committing offsets and error handling.
Also, making the EventProcessorImpl
a prototype bean won't help. That just means a new instance is used each time the bean is referenced.
Since it is @Autowired
it is only referenced once, during initialization. To get a new instance for each request, you would need to call getBean()
on the application context each time.
It is better to make your code thread-safe.
EDIT
There are (at least) a couple of ways to deal with a not thread-safe service defined in prototype scope.
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final ThreadLocal<NotThreadSafeService> SERVICES = new ThreadLocal<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.get();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
SERVICES.set(service);
}
service.process(in);
}
@EventListener
void removeService(ConsumerStoppedEvent event) {
System.out.println("Consumer stopped; removing TL");
SERVICES.remove();
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final BlockingQueue<NotThreadSafeService> SERVICES = new LinkedBlockingQueue<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.poll();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
}
try {
service.process(in);
}
finally {
SERVICES.add(service);
}
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}
Upvotes: 1