Reputation: 459
I'm trying to implement a gateway in Spring Boot, having REST endpoints and inserting the messages in a RabbitMQ broker. I need to handle the errors, so I configured a replyAddress with a DLQ, and a SimpleMessageListenerContainer with my RabbitTemplate to mark it as "listener" and be able to consume the replyQueue.
It works fine with "hard coded" beans :
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReceiveTimeout(0);
template.setReplyTimeout(10000);
template.setExchange("inputExchange");
template.setRoutingKey("routing.1");
template.setReplyAddress("replyQueue1");
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);
return template;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("replyQueue1");
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
But the goal of this Gateway is to be entirely configurable, so not to code every route to a Rabbit exchange/queue.
For example, I have this configuration in a yaml :
routes:
service1:
exchange: inputExchange
queue: inputQueue1
routing: routing.1
replyQueue: replyQueue1
dlExchange: reply.dlx1
dlQueue: dlx.queue1.reply
receiveTimeout: 0
replyTimeout: 10000
preProcessors: package.processor.LowercaseProcessor
postProcessors: package.processor.UppercaseProcessor
service2:
exchange: inputExchange
queue: inputQueue2
routing: routing.2
So I need to dynamically create my RabbitTemplate and SimpleMessageListenerContainer to configure for each service the replyQueue, the DLQ, ...
I tried with this code :
@Configuration
public class RabbitTemplatesConfiguration implements BeanFactoryAware {
@Autowired
private GatewayProperties properties;
@Autowired
private ConnectionFactory connectionFactory;
private BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@PostConstruct
public void configure() {
Assert.state(beanFactory instanceof ConfigurableBeanFactory, "wrong bean factory type");
ConfigurableBeanFactory configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;
Map<String, ServiceProperties> routes = properties.getRoutes();
if (routes != null) {
for (String service : routes.keySet()) {
ServiceProperties props = routes.get(service);
createTemplate(configurableBeanFactory, service, props);
}
}
}
private void createTemplate(ConfigurableBeanFactory configurableBeanFactory, String service, ServiceProperties props) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(props.getExchange());
template.setRoutingKey(props.getRouting());
template.setReplyAddress(props.getReplyQueue());
template.setReceiveTimeout(props.getReceiveTimeout());
template.setReplyTimeout(props.getReplyTimeout());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setDefaultType(Event.class);
messageConverter.setClassMapper(classMapper);
template.setMessageConverter(messageConverter);
configurableBeanFactory.registerSingleton(service + "Template", template);
if(!StringUtils.isEmpty(props.getReplyQueue())) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(props.getReplyQueue());
container.setMessageListener(new MessageListenerAdapter(template));
configurableBeanFactory.registerSingleton(service + "ListenerContainer", container);
container.afterPropertiesSet(); //added this but not working either
container.start(); //added this but not working either
}
}
}
but when I receive the response on the replyQueue, I have this error :
java.lang.IllegalStateException: RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': replyQueue1
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithFixed(RabbitTemplate.java:1312)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1251)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1218)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1189)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1156)
So the SimpleMessageListenerContainer doesn't seem to be properly instantiated / configured.
Do you know what is the problem ?
My code sending and receiving :
@Autowired
private ApplicationContext context;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private GatewayProperties properties;
@PostMapping("/{service}")
public ResponseEntity<Object> call(@PathVariable("service") String service, @RequestBody Event body) {
ServiceProperties serviceProperties = properties.getRoutes().get(service);
Queue queue = QueueBuilder.durable(serviceProperties.getQueue()).build();
rabbitAdmin.declareQueue(queue);
TopicExchange exchange = new TopicExchange(serviceProperties.getExchange());
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(serviceProperties.getRouting()));
Queue replyQueue = null;
if (!StringUtils.isEmpty(serviceProperties.getReplyQueue())) {
replyQueue = QueueBuilder.durable(serviceProperties.getReplyQueue()).withArgument("x-dead-letter-exchange", serviceProperties.getDlExchange()).build();
rabbitAdmin.declareQueue(replyQueue);
Queue dlQueue = QueueBuilder.durable(serviceProperties.getDlQueue()).build();
rabbitAdmin.declareQueue(dlQueue);
TopicExchange dlqExchange = new TopicExchange(serviceProperties.getDlExchange());
rabbitAdmin.declareExchange(dlqExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(dlQueue).to(dlqExchange).with(serviceProperties.getReplyQueue()));
}
RabbitTemplate template = (RabbitTemplate) context.getBean(service + "Template");
Event outputMessage = (Event) template.convertSendAndReceive(serviceProperties.getExchange(), serviceProperties.getRouting(), body, new CorrelationData(UUID.randomUUID().toString()));
//...
}
Upvotes: 0
Views: 4005
Reputation: 174574
It's not clear why you are using a reply queue; RabbitMQ now provides a direct reply-to mechanism which removes most of the reasons for using a fixed reply queue (one exception is if you want an HA reply queue).
That said, the problem is you are wrapping the template in a MessageListenerAdapter
- this is not necessary (and won't work anyway) - the template implements MessageListener
.
Upvotes: 2