Reputation: 451
I'm having some issues creating a new queue in RabbitMQ. I'm only creating a consumer client that will consume messages coming from another microservice.
Here is what I've done so far.
application.properties:
spring.rabbitmq.addresses=SCGLCCRAMQD0005.SCGER.DEV.CORP:5672
spring.rabbitmq.username=people-consumer
spring.rabbitmq.password=*************
spring.rabbitmq.vhost=PEOPLE
peopleevents.queue=qu-people-cores-update
peopleevents.exchange=ex-people-updates
peopleevents.routingkey=ONLINE.UPDATE.PERSONF.PERSONF
The config class:
@Configuration
public class MQConfig {
@Value("${peopleevents.queue}")
public String queue;
@Value("${peopleevents.exchange}")
public String exchange;
@Value("${peopleevents.routingkey}")
public String routingKey;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.addresses}")
private String address;
@Value("${spring.rabbitmq.vhost}")
private String vHost;
@Bean
public Queue queue() {
return new Queue(queue, true, false, false);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchange);
}
//@Bean
//Exchange myExchange() {
// return ExchangeBuilder.topicExchange(exchange).durable(true).build();
//}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
public ConnectionFactory connectionFactory() throws IOException {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(address);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vHost);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
And the listener class:
@Service
public class RabbitMQService {
private final Logger logger = LoggerFactory.getLogger(RabbitMQService.class);
@RabbitListener(queues = "${peopleevents.queue}")
public void receivedMessage(@Payload Message message) {
logger.info("User Details Received is.. " + message.toString());
}
}
When I run this, I have a ACCESS_REFUSED message but I don't know why. Am I missing something??
Thanks!
Upvotes: 0
Views: 1583
Reputation: 2992
You can use RabbitAdmin
to create queues dynamically, so you need to add to your configuration class the RabbitAdmin
bean:
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
Then you can autowire RabbitAdmin
with the Binding
and Queue
configurations:
@Autowired
private AmqpAdmin admin;
@Autowired
private Binding binding;
@Autowired
private Queue queue;
Finally you can create queues as you like:
admin.declareQueue(queue);
admin.declareBinding(binding);
You can create the queue in your configuration or service class.
You can implement it in your service class within a constructor:
@Service
public class RabbitMQService {
private final Logger logger = LoggerFactory.getLogger(RabbitMQService.class);
private AmqpAdmin admin;
private Binding binding;
private Queue queue;
@Autowired
public RabbitMQService (AmqpAdmin admin, Binding binding, Queue queue) {
this.admin = admin;
this.binding = binding;
this.queue = queue;
admin.declareQueue(queue);
admin.declareBinding(binding);
}
@RabbitListener(queues = "${peopleevents.queue}")
public void receivedMessage(@Payload Message message) {
logger.info("User Details Received is.. " + message.toString());
}
}
Upvotes: 2