Reputation: 24561
I am trying to create example for publish-subscribe based on @JmsListener annotation: https://github.com/lkrnac/book-eiws-code-samples/tree/master/05-jms/0515-publish-subscribe
Relevant code snippets:
@Slf4j
@SpringBootApplication
@EnableScheduling
public class JmsPublishSubscribeApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(JmsPublishSubscribeApplication.class, args);
}
@Bean
public ActiveMQTopic simpleTopic() {
return new ActiveMQTopic("simpleTopic");
}
}
@Component
public class SimpleMessageListener1 {
@JmsListener(destination = "simpleTopic")
public void readMessage(String message) {
//....
}
}
@Component
public class SimpleMessageListener2 {
@JmsListener(destination = "simpleTopic")
public void readMessage(String message) {
//....
}
}
The problem is that is get this behaviour:
2015-05-17 20:07:04.985 INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender : Sending message: simple message
2015-05-17 20:07:05.070 INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 2
2015-05-17 20:07:05.975 INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender : Sending message: simple message
2015-05-17 20:07:05.986 INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 1
2015-05-17 20:07:06.975 INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender : Sending message: simple message
2015-05-17 20:07:06.987 INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 2
2015-05-17 20:07:07.975 INFO 22983 --- [pool-1-thread-1] n.l.b.e.chapter05.SimpleMessageSender : Sending message: simple message
2015-05-17 20:07:07.994 INFO 22983 --- [enerContainer-1] n.l.b.e.c.JmsPublishSubscribeApplication : Message Received: simple message via listener 1
But each message should be consumed by both listeners by definition of topics. What am I missing?
Upvotes: 17
Views: 24257
Reputation: 124471
When using a @JmsListener
it uses a DefaultMessageListenerContainer
which extends JmsDestinationAccessor
which by default has the pubSubDomain
set to false
. When this property is false it is operating on a queue. If you want to use topics you have to set this properties value to true
.
As you are using Spring Boot you can quite easily set this property to true by adding the spring.jms.pub-sub-domain
property to the application.properties
and set it to true
.
spring.jms.pub-sub-domain=true
When using a @JmsListener
it is looking for a jmsListenerContainerFactory
named bean, if that isn't available a default one is expected. You can also include your own bean and programmatically set this property yo true
.
@Bean
public DefaultMessageListenerContainer jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory dmlc = new DefaultJmsListenerContainerFactory();
dmlc.setPubSubDomain(true);
// Other configuration here
return dmlc;
}
This would of course also work but would be more work, more information on this can be found in the documentation of the @EnableJms
annotation.
Upvotes: 41
Reputation: 2215
Switching the default destination type of a @JmsListener
from Queue
to Topic
can be done completely in Java without modifying the properties or using XML.
The Spring guide contains an example for customizing the default settings provided by DefaultMessageListenerContainer
.
It requires defining a custom bean like follows:
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setPubSubDomain(true);
return factory;
}
This can then be used in the @JmsListener
annotated method:
@JmsListener(destination = "mailbox", containerFactory = "myFactory")
public void receiveMessage(Email email) {
// implementation
}
Upvotes: 10