Reputation: 142
I am trying to send Jms message using spring integration from one spring boot app (master) to the other spring boot app (worker). I am using the embedded ActiveMQ as the broker. I have done the following:
MasterConfig
@Profile("master")
@EnableBatchProcessing
@EnableBatchIntegration
public class MasterConfig {
@Autowired
private JmsTemplate jmsTemplate;
@Bean
public MessageChannel inputChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow jmsOutboundFlow(JmsTemplate jmsTemplate){
return IntegrationFlows.from(inputChannel())
.handle(Jms.outboundAdapter(jmsTemplate).destination("requestQueue"))
.get();
}
@Bean
public ApplicationRunner runner(){
return args -> {
for(int i = 1; i <= 3; i++)
inputChannel().send(MessageBuilder.withPayload("hello " + i).build());
};
}
}
WorkerConfig
:
@Profile("worker")
@EnableBatchIntegration
@EnableBatchProcessing
public class WorkerConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Bean
public IntegrationFlow jmsMessageDrivenFlow(){
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cachingConnectionFactory).destination("requestQueue"))
.handle(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
})
.get();
}
}
CommonConfig
@Profile("worker | master")
@Configuration
@EnableIntegration
public class CommonConfig {
@Value("${activemq.broker-url}")
private String brokerUrl;
@Bean
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL(brokerUrl);
return cf;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(){
return new CachingConnectionFactory(connectionFactory());
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate =
new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setPubSubDomain(false);
return jmsTemplate;
}
@Bean
public Queue requestQueue(){
return new ActiveMQQueue("queue.demo");
}
@Bean
public Queue replyQueue(){
return new ActiveMQQueue("queue.reply");
}
}
dependencies in pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties
activemq.broker-url=vm://localhost
So I compiled the project using mvn clean package
command then run the jar two times each one with different spring profile set
First:
java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=worker
Second:
java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=master
But both applications just close and no message receiving or sending. What I expected is when then worker app is started it will wait for any message, and when the master app is started it will send three message and in the worker will be triggered by the incoming message and print the message.
The following is the log of what happen I started the worker and the master app:
2023-10-26 15:05:45.494 INFO 12348 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "worker"
2023-10-26 15:05:45.843 INFO 12348 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:05:45.854 INFO 12348 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:05:47.156 INFO 12348 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.156 INFO 12348 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:05:47.157 INFO 12348 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:05:47.169 INFO 12348 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 1.989 seconds (JVM running for 2.403)
2023-10-26 15:05:47.176 INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.177 INFO 12348 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:05:47.178 INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
=============================================================================================================================================================================
2023-10-26 15:06:16.995 INFO 12528 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "master"
2023-10-26 15:06:17.349 INFO 12528 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:06:17.360 INFO 12528 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:06:18.663 INFO 12528 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.664 INFO 12528 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:06:18.664 INFO 12528 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:06:18.673 INFO 12528 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 2.011 seconds (JVM running for 2.425)
2023-10-26 15:06:18.683 INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.683 INFO 12528 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:06:18.683 INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
I am currently still learning spring-integration and only followed these sources:
Any suggestion to achieve what I expected?
I did some changes as follows:
In the application.properties
:
activemq.broker-url=tcp://localhost:61616
I add the following bean in the CommonConfig
:
@Bean
public BrokerService broker() throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
return broker;
}
and the following dependency to the pom.xml
:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
But still no communication happen:
2023-10-27 15:18:19.236 INFO 24904 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "worker"
2023-10-27 15:18:19.630 INFO 24904 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:18:19.643 INFO 24904 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:18:20.129 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:18:20.171 INFO 24904 --- [ main] o.a.a.store.kahadb.MessageDatabase : KahaDB is version 7
2023-10-27 15:18:20.286 INFO 24904 --- [ main] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:18:20.415 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:18:20.420 INFO 24904 --- [ main] o.a.a.t.TransportServerThreadSupport : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:18:20.421 INFO 24904 --- [ main] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:18:20.421 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:18:20.422 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : For help or more information please see: http://activemq.apache.org
2023-10-27 15:18:21.314 INFO 24904 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.314 INFO 24904 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:18:21.316 INFO 24904 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.331 INFO 24904 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 2.423 seconds (JVM running for 2.828)
2023-10-27 15:18:21.336 INFO 24904 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:18:21.337 INFO 24904 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:18:21.339 INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.339 INFO 24904 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:18:21.340 INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.341 INFO 24904 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:18:21.342 INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async queue tasks
2023-10-27 15:18:21.343 INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async topic tasks
=======================================================================================================
2023-10-27 15:20:13.451 INFO 17960 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "master"
2023-10-27 15:20:13.807 INFO 17960 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:20:13.820 INFO 17960 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:20:14.282 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:20:14.326 INFO 17960 --- [ main] o.a.a.store.kahadb.MessageDatabase : KahaDB is version 7
2023-10-27 15:20:14.448 INFO 17960 --- [ main] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:20:14.583 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.a.a.t.TransportServerThreadSupport : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : For help or more information please see: http://activemq.apache.org
2023-10-27 15:20:15.453 INFO 17960 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.454 INFO 17960 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:20:15.456 INFO 17960 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.469 INFO 17960 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 2.35 seconds (JVM running for 2.758)
2023-10-27 15:20:15.474 INFO 17960 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:20:15.475 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:20:15.476 INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.478 INFO 17960 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:20:15.479 INFO 17960 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:20:15.480 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async queue tasks
2023-10-27 15:20:15.481 INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.481 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async topic tasks
2023-10-27 15:20:15.484 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopped KahaDB
Upvotes: 0
Views: 443
Reputation: 174554
The vm:
transport is only available within the same JVM; you need to enable the tcp:
transport.
See this answer: Is it possible to connect to spring boot embedded ActiveMQ instance from another application(started in separate process)?
Upvotes: 0