Reputation: 694
Message priorities in queues in RabbitMQ. It is working with rabbitmq provided java client. But it does not work with spring-rabbit dependency. Please have a look.
Using RabbitMQ Java Client
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.samples</groupId>
<artifactId>RabbitMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<developers>
<developer>
<name>Sagar Rout</name>
</developer>
</developers>
<properties>
<!-- Generic properties -->
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Spring -->
<spring-framework.version>4.3.2.RELEASE</spring-framework.version>
</properties>
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<!-- Spring AMQP -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.1.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Publisher.java
public class Publisher {
private final static String QUEUE_NAME = "S1_Priority";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
String message = "Hello World!";
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME,
new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(i).build(),
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'" + "priority" + i);
}
channel.close();
connection.close();
}}
Consumer.Java
public class Consumer {
private final static String QUEUE_NAME = "S1_Priority";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'" + properties.getPriority());
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}}
This is working and the message with higher priority is coming up. But it does not work with Spring-rabbit. Please find the code.
RabbitMQConfig.class
@Configuration
@ComponentScan(basePackages = { "com.blackocean.*" })
@PropertySource("classpath:config.properties")
public class RabbitMQConfig {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private Integer port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
@Value("${rabbitmq.connection.size}")
private Integer connectionSize ;
@Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
return new PropertySourcesPlaceholderConfigurer();
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setConnectionLimit(connectionSize);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue queue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
Queue queue = new Queue("myQueue", true, false, false, args) ;
return queue ;
}}
SendUsingJavaConfig
public class Send1UsingJavaConfig {
/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
rabbitTemplate.convertAndSend("", "myQueue", "Hi Mr.Ocean 10", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(9);
return message;
}
});
}
}
ReceiveusingJavaConfig
public class RecvUsingJavaConfig {
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// Basic Example
String message = (String) rabbitTemplate.receiveAndConvert("myQueue");
System.out.println(message);
}}
Config.properties
#RabbitMQ
rabbitmq.host=localhost
#Always provide port and connection size in numbers
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.connection.size=100
Now I am sending the message with different priority but it always receives the message in the order. Any suggestion will be great !!!
Upvotes: 1
Views: 3064
Reputation: 123
When using spring-boot amqp, it is important to set
spring.rabbitmq.listener.simple.prefetch=1
Otherwise spring-boot is fetching 250 messages absolutely ignoring priorities.
Upvotes: 1
Reputation: 274
If anyone has similar requirements on message priority then you need to define priority (Configuration Class) before the queue is created. If you plan to apply the config for existing queues it will not work (from my testing).
@Value("${myApp.rabbitmq.queue}")
private String queueName;
@Bean
Queue queue(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
Queue queue = new Queue(queueName, true, false, false, args) ;
return queue ;
}
When you push messages into the queue make sure the priority doesn't exceed 10 as we have defined max priority on the queue as 10.
BR, Santhosh
Upvotes: -1
Reputation: 4948
Just a guess here , i tried looking into an old AMQP library i had used (priority queue in an older version of Rabbit MQ).
The priority was set as below
args.put("x-max-priority", 10);
, it looks slightly different from args.put("x-priority", 10);
.
You could refer the old priority queue repo in the link. You could try to see if that helps
Upvotes: 4