Reputation: 109
I have the following configuration that I am attempting to change to support priority queues. Based on done research it says I should have two queues one for each priority. I adjusted my configuration from the following:
@Configuration
public class FixedReplyQueueConfig {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("urbanbuz");
connectionFactory.setPassword("ub");
connectionFactory.setVirtualHost("urbanbuzvhost");
return connectionFactory;
}
/**
* @return Rabbit template with fixed reply queue.
*/
@Bean
public RabbitTemplate fixedReplyQRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
template.setExchange(ex().getName());
template.setRoutingKey("test");
template.setReplyQueue(replyQueue());
return template;
}
/**
* @return The reply listener container - the rabbit template is the listener.
*/
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(fixedReplyQRabbitTemplate());
return container;
}
/**
* @return The listener container that handles the request and returns the reply.
*/
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueues(requestQueue());
container.setMessageListener(new MessageListenerAdapter(new PojoListener()));
return container;
}
@Bean
public DirectExchange ex() {
return new DirectExchange("ub.exchange", false, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(requestQueue()).to(ex()).with("test");
}
@Bean
public Queue requestQueue() {
return new Queue("ub.request");
}
@Bean
public Queue replyQueue() {
return new Queue("ub.reply");
}
/**
* @return an admin to handle the declarations.
*/
@Bean
public RabbitAdmin admin() {
return new RabbitAdmin(rabbitConnectionFactory());
}
}
to the following:
@Configuration
public class FixedReplyQueueConfig {
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("urbanbuz");
connectionFactory.setPassword("ub");
connectionFactory.setVirtualHost("urbanbuzvhost");
return connectionFactory;
}
/**
* @return Rabbit template with fixed reply queue.
*/
@Bean
public RabbitTemplate fixedReplyQRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
template.setExchange(ex().getName());
template.setRoutingKey("high");
template.setRoutingKey("normal");
template.setReplyQueue(replyQueue());
return template;
}
/**
* @return The reply listener container - the rabbit template is the listener.
*/
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(fixedReplyQRabbitTemplate());
return container;
}
/**
* @return The listener container that handles the request and returns the reply.
*/
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueues(requestQueueHigh(), requestQueue());
container.setMessageListener(new MessageListenerAdapter(new PojoListener()));
return container;
}
@Bean
public DirectExchange ex() {
return new DirectExchange("ub.exchange", false, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(requestQueue()).to(ex()).with("normal");
}
@Bean
public Binding bindingHigh() {
return BindingBuilder.bind(requestQueueHigh()).to(ex()).with("high");
}
@Bean
public Queue requestQueue() {
return new Queue("ub.request");
}
@Bean
public Queue requestQueueHigh() {
return new Queue("ub.request.high");
}
@Bean
public Queue replyQueue() {
return new Queue("ub.reply");
}
/**
* @return an admin to handle the declarations.
*/
@Bean
public RabbitAdmin admin() {
return new RabbitAdmin(rabbitConnectionFactory());
}
}
Am I going around this the correct way? Now how should I proceed to make the consumer consume one over the other?
This is how I call to test:
public class App {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(FixedReplyQueueConfig.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitTemplate.convertSendAndReceive("ub.exchange", "normal" , "yalla");
rabbitTemplate.convertSendAndReceive("ub.exchange", "high" , "hello");
}
}
And this is my pojo class:
public class PojoListener {
public String handleMessage(String foo) {
System.out.println("IN MESSAGE RECEIVER");
return "it's the weekend!!!!!!!!!!!!";
}
}
Both messages are sent through however not sure how priority is implemented with the implemented configuration and classes.
Note that I'm trying to avoid any plugins to support priority.
Upvotes: 2
Views: 749
Reputation: 121550
The code from Spring AMQP (BlockingQueueConsumer
) looks like:
for (String queueName : queues) {
if (!this.missingQueues.contains(queueName)) {
consumeFromQueue(queueName);
}
}
Os, if your config is:
container.setQueues(requestQueueHigh(), requestQueue());
You are really going to receive messages from requestQueueHigh()
over requestQueue()
. And it is independently of concurrency
on ListenerContainer
.
You can test that with several messages, but with stopped
listener from the start of application. And start()
listener container eventually after sending for those messages.
Upvotes: 1