Reputation: 40
I'm working on a project that involves 2 queue's, and multiples Listeners interacting with them. Flow:
I've been reading and the best way to do is with a fanout-exchange. Here is my code:
listener-configuration.xml
<!-- CREATE CONNECTION FACTORY -->
<rabbit:connection-factory id="connectionFactory"
host="localhost" username="guest" password="guest" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- <!-- RABBIT QUEUE'S -->
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- Webapp Queue -->
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE -->
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange">
<rabbit:bindings>
<rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding>
<rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- CREATE THE RABBIT TEMPLATES -->
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/>
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/>
<!-- INSTANTIATE THE LISTENERS -->
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" />
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" />
<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />
<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER -->
<rabbit:listener-container id="listenerContainer"
connection-factory="connectionFactory" message-converter="jsonMessageConverter">
<rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" />
<rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" />
</rabbit:listener-container>
sender-configuration.xml
<!-- First following line creates a rabbit connection factory with specified parameters -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" />
<!-- Obtain admin rights to create an exchange -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- Create a bean which can send message to trashroute-exchange for the Java program to call -->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="myExchange"
message-converter="jsonMessageConverter" />
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/>
</property>
Listener MainConfiguration.java
@Configuration
public class MainConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public DataController DataController(){
return new DataController();
}
@Bean
// Every queue is bound to the default direct exchange
public Queue persistenceQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.persistenceQueue, true);
}
@Bean
public Queue webappQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.webappQueue, true);
}
}
Sender MainConfiguration.java
@Configuration
public class SenderConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
//Create the Template
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new JsonMessageConverter());
return template;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
"localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public IServiceManager scheduledProducer() {
return new ServiceManagerImpl();
}
@Bean
public BeanPostProcessor postProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
Can anyone please tell me what I'm doing wrong? One of the two Listeners, works perfectly, the second never reads a message.
Upvotes: 2
Views: 10311
Reputation: 2668
Based on the scenario explained above I have tried to create a sample application which uses Spring Java Config.
Messages are published to trashroute
and webapp
queues, and respective receivers (persistence
and webapp
) receive the messages.
RabbitConfiguration.java (Contains configuration for both Sender and Receiver)
@Configuration
@EnableRabbit
public class RabbitConfiguration {
public static final String BROADCAST_TRASHROUTE_QUEUE = "trashroute.rabbit.queue";
public static final String BROADCAST_WEBAPP_QUEUE = "webapp.rabbit.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public Queue trashRouteQueue() {
return new Queue(BROADCAST_TRASHROUTE_QUEUE);
}
@Bean
public Queue webAppQueue() {
return new Queue(BROADCAST_WEBAPP_QUEUE);
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
@Bean
public FanoutExchange trashRouteExchange() {
FanoutExchange exchange = new FanoutExchange("trashroute");
return exchange;
}
@Bean
public Binding trashRouteBinding() {
return BindingBuilder.bind(trashRouteQueue()).to(trashRouteExchange());
}
@Bean
public Binding webAppBinding() {
return BindingBuilder.bind(webAppQueue()).to(trashRouteExchange());
}
@Bean
SimpleMessageListenerContainer persistenceListenerContainer(ConnectionFactory connectionFactory, @Qualifier("persistenceListenerAdapter") MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(trashRouteQueue(), webAppQueue());
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter persistenceListenerAdapter(PersistenceListener receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
SimpleMessageListenerContainer webAppListenerContainer(ConnectionFactory connectionFactory, @Qualifier("webAppListenerAdapter") MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(trashRouteQueue(), webAppQueue());
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter webAppListenerAdapter(WebAppListener webAppListener) {
return new MessageListenerAdapter(webAppListener, "receiveMessage");
}
@Bean
PersistenceListener persistenceListener() {
return new PersistenceListener();
}
@Bean
WebAppListener webAppListener() {
return new WebAppListener();
}
}
PersistenceListener.java
public class PersistenceListener {
public void receiveMessage(String message) {
System.out.println("Persistence Listener: Messsage Received <" + message + ">");
}
}
WebAppListener.java
public class WebAppListener {
public void receiveMessage(String message) {
System.out.println("WebAppListener: Message Received <" + message + ">");
}
}
Application.java
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired
AnnotationConfigApplicationContext context;
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Waiting five seconds...");
Thread.sleep(5000);
System.out.println("Sending message...");
RabbitTemplate rabbitTemplate = (RabbitTemplate) context.getBean("rabbitTemplate");
rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_TRASHROUTE_QUEUE, "Hello from trashroute queue!");
rabbitTemplate.convertAndSend(RabbitConfiguration.BROADCAST_WEBAPP_QUEUE, "Hello from webapp queue!");
Thread.sleep(10000);
context.close();
}
}
Hope this will help. Although you would need to refactor the code if you want to use this in production.
Upvotes: 18