Shahid Ghafoor
Shahid Ghafoor

Reputation: 3103

spring boot and BlockingQueue listener

I have implement jms with spring boot, I am using @JmsListener to listen the topic

  @Component
    public class AMQListner {
        BlockingQueue<MessageBO> queue = new ArrayBlockingQueue<>(1024);
        @JmsListener(destination = "${spring.activemq.topic}")
        public void Consume(TextMessage message) {
            try {
                String json = message.getText();
                MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
                queue.add(bo);
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (JsonParseException e) {
                e.printStackTrace();
            } catch (JsonMappingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

Now I want a listener that listen that blocking-queue, if it has value , then process. can we achieve this using annotation in spring boot ?

Upvotes: 2

Views: 3751

Answers (1)

daniu
daniu

Reputation: 15008

First of all, the proper way is to create a handler bean instead of having a member with the message queue in the receiver class.

public interface MessageHandler extends Consumer<MessageBO> {
    public default void handle(MessageBO msg) { accept(msg); }
}

@Component
public class AMQListener {
    @Resource("multiplexer")
    MessageHandler handler;

    @JmsListener(destination = "${spring.activemq.topic}")
    public void Consume(TextMessage message) {
        try {
            String json = message.getText();
            MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
            handler.handle(bo);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Then, you would have the queue in the handler bean

@Component("multiplexer")
public class MessageMultiplexer implements MessageHandler {
    @Autowired
    MessageHandler actualConsumer;

    ExecutorService executor = Executors.newFixedThreadPool(4);
    public void accept(MessageBO msg) {
        executor.submit(msg -> actualConsumer.handle(msg));
    }
}

The Executor is pretty much the queue in this case.

Caveat: you do not have your 1024 limit in this way. You can do that by using the ThreadPoolExecutor constructor and pass it a limited queue.

Upvotes: 2

Related Questions