Reputation: 3103
I have implement jms with spring boot, I am using @JmsListener to listen the topic
@Component
public class AMQListner {
BlockingQueue<MessageBO> queue = new ArrayBlockingQueue<>(1024);
@JmsListener(destination = "${spring.activemq.topic}")
public void Consume(TextMessage message) {
try {
String json = message.getText();
MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
queue.add(bo);
} catch (JMSException e) {
e.printStackTrace();
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Now I want a listener that listen that blocking-queue, if it has value , then process. can we achieve this using annotation in spring boot ?
Upvotes: 2
Views: 3751
Reputation: 15008
First of all, the proper way is to create a handler bean instead of having a member with the message queue in the receiver class.
public interface MessageHandler extends Consumer<MessageBO> {
public default void handle(MessageBO msg) { accept(msg); }
}
@Component
public class AMQListener {
@Resource("multiplexer")
MessageHandler handler;
@JmsListener(destination = "${spring.activemq.topic}")
public void Consume(TextMessage message) {
try {
String json = message.getText();
MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
handler.handle(bo);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Then, you would have the queue in the handler bean
@Component("multiplexer")
public class MessageMultiplexer implements MessageHandler {
@Autowired
MessageHandler actualConsumer;
ExecutorService executor = Executors.newFixedThreadPool(4);
public void accept(MessageBO msg) {
executor.submit(msg -> actualConsumer.handle(msg));
}
}
The Executor is pretty much the queue in this case.
Caveat: you do not have your 1024 limit in this way. You can do that by using the ThreadPoolExecutor
constructor and pass it a limited queue.
Upvotes: 2