Reputation: 735
I am new to spring and working on a cloud based application and trying to use RabbitTemplate and RabbitMQ.
I am able to store data to queue using.
rabbitTemplate.convertAndSend(QUEUE_NAME, msg);
But when i am receiving the data from the same queue using
rabbitTemplate.receiveAndConvert(QUEUE_NAME)
I am getting exception as:
ERR Caused by: java.io.IOException 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.ChannelN.basicGet(ChannelN.java:985) 2016-09-13T11:15:21.38+0530 [App/0] ERR at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2016-09-13T11:15:21.38+0530 [App/0] ERR at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2016-09-13T11:15:21.38+0530 [App/0] ERR at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2016-09-13T11:15:21.38+0530 [App/0] ERR at java.lang.reflect.Method.invoke(Method.java:498) 2016-09-13T11:15:21.38+0530 [App/0] ERR at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:625) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.sun.proxy.$Proxy55.basicGet(Unknown Source) 2016-09-13T11:15:21.38+0530 [App/0] ERR at org.springframework.amqp.rabbit.core.RabbitTemplate$4.doInRabbit(RabbitTemplate.java:650) 2016-09-13T11:15:21.38+0530 [App/0] ERR at org.springframework.amqp.rabbit.core.RabbitTemplate$4.doInRabbit(RabbitTemplate.java:646) 2016-09-13T11:15:21.38+0530 [App/0] ERR at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1045) 2016-09-13T11:15:21.38+0530 [App/0] ERR ... 50 more 2016-09-13T11:15:21.38+0530 [App/0] ERR Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no queue 'testqueue' in vhost '9cc1b4db-636e-4251-bb68-c7ed7f3be1d3', class-id=60, method-id=70) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 2016-09-13T11:15:21.38+0530 [App/0] ERR ... 60 more 2016-09-13T11:15:21.38+0530 [App/0] ERR Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no queue 'testqueue' in vhost '9cc1b4db-636e-4251-bb68-c7ed7f3be1d3', class-id=60, method-id=70) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) 2016-09-13T11:15:21.38+0530 [App/0] ERR at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
My Code:
@Configuration
@Profile("cloud")
public class RabbitConfig extends AbstractCloudConfig {
@Bean
public RabbitTemplate rabbitTemplate(){
CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory)connectionFactory().rabbitConnectionFactory();
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
return rabbitTemplate;
}
}
My Controller:
@RestController
@RequestMapping("mq")
public class MainController {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = "testqueue";
@RequestMapping(value = "/putinq/{msg}",method = RequestMethod.PUT)
public String storeMessage(@PathVariable("msg") String msg){
String result = "";
rabbitTemplate.setQueue(QUEUE_NAME);
try {
rabbitTemplate.convertAndSend(QUEUE_NAME, msg); // no exception
Thread.sleep(3000);
Object object = rabbitTemplate.receiveAndConvert(QUEUE_NAME); // getting exception here
System.out.println("Received: "+object);
result = "success";
}catch(Exception ex){
ex.printStackTrace();
}
return result;
}
}
UPDATED RabbitConfig.java
@Configuration
@Profile("cloud")
public class RabbitConfig extends AbstractCloudConfig {
private static final String QUEUE_NAME = "testqueue";
@Bean
public RabbitTemplate rabbitTemplate(){
CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory)(connectionFactory().rabbitConnectionFactory());
System.out.println("------------------------ Rabbit mq template: " + cachingConnectionFactory.getCacheMode());
System.out.println("------------------------ cachingConnectionFactory.toString(): " + cachingConnectionFactory.toString());
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
cachingConnectionFactory.setChannelCacheSize(25);
System.out.println("----------after set-------------- Rabbit mq template: " + cachingConnectionFactory.getCacheMode());
System.out.println("------------------------ cachingConnectionFactory.toString(): " + cachingConnectionFactory.getChannelCacheSize());
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
System.out.println("------------------------ Rabbit mq template: " + rabbitTemplate);
return rabbitTemplate;
}
@Bean
public Queue myQueue() {
System.out.println("--------------@@---------- creating queue: ");
final boolean isDurable = true;
final boolean isExclusive = false;
final boolean autoDelete = false;
return new Queue(QUEUE_NAME, isDurable, isExclusive, autoDelete);
}
}
I tried searching this on the internet but still i am not able to resolve the issue, Struggling with this exception for long, any help is appreciated.
Got the solution!
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setUsername("");
cachingConnectionFactory.setPassword("");
cachingConnectionFactory.setVirtualHost("");
cachingConnectionFactory.setHost("");
cachingConnectionFactory.setPort(1);
cachingConnectionFactory.setRequestedHeartBeat(30);
cachingConnectionFactory.setConnectionTimeout(30000);
RabbitAdmin admin = new RabbitAdmin(cachingConnectionFactory());
Queue queue = new Queue(QUEUE_NAME);
admin.declareQueue(queue);
TopicExchange exchange = new TopicExchange(EXCHANGE_NAME);
admin.declareExchange(exchange); admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME));
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory());
Upvotes: 6
Views: 14231
Reputation: 174574
You need a RabbitAdmin
@Bean
to declare the queues/bindings - it automatically finds beans of those types and declares them on the broker when the connection is established.
If you use Spring Boot, it will automatically register a template and admin for you.
Upvotes: 5
Reputation: 4476
You need binding your queue and the exchage, please try this RabbitmqConfig below
@Configuration
public class RabbitmqConfig {
private static final String QUEUE_NAME = "testqueue";
@Bean
Queue queue() {
return new Queue(QUEUE_NAME, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange", true, false);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
}
Upvotes: 1
Reputation: 1955
You need to define the queue itself in your configuration.
@Bean
public org.springframework.amqp.core.Queue myQueue() {
final boolean isDurable = true;
final boolean isExclusive = false;
final boolean autoDelete = false;
return new org.springframework.amqp.core.Queue(QUEUE_NAME, isDurable, isExclusive, autoDelete);
}
Upvotes: 2