of32 inc
of32 inc

Reputation: 142

Send JMS Message using Spring Integration between different spring boot application

I am trying to send Jms message using spring integration from one spring boot app (master) to the other spring boot app (worker). I am using the embedded ActiveMQ as the broker. I have done the following:

MasterConfig

@Profile("master")
@EnableBatchProcessing
@EnableBatchIntegration
public class MasterConfig {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Bean
    public MessageChannel inputChannel(){
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow jmsOutboundFlow(JmsTemplate jmsTemplate){
        return IntegrationFlows.from(inputChannel())
                .handle(Jms.outboundAdapter(jmsTemplate).destination("requestQueue"))
                .get();
    }

    @Bean
    public ApplicationRunner runner(){
        return args -> {
            for(int i = 1; i <= 3; i++)
                inputChannel().send(MessageBuilder.withPayload("hello " + i).build());
        };
    }

}

WorkerConfig:

@Profile("worker")
@EnableBatchIntegration
@EnableBatchProcessing
public class WorkerConfig {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public IntegrationFlow jmsMessageDrivenFlow(){
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cachingConnectionFactory).destination("requestQueue"))
                .handle(new MessageHandler() {
                    @Override
                    public void handleMessage(Message<?> message) throws MessagingException {
                        System.out.println(message.getPayload());
                    }
                })
                .get();
    }
}

CommonConfig

@Profile("worker | master")
@Configuration
@EnableIntegration
public class CommonConfig {

    @Value("${activemq.broker-url}")
    private String brokerUrl;

    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
        cf.setBrokerURL(brokerUrl);
        return cf;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(){
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate =
                new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setPubSubDomain(false);
        return jmsTemplate;
    }

    @Bean
    public Queue requestQueue(){
        return new ActiveMQQueue("queue.demo");
    }

    @Bean
    public Queue replyQueue(){
        return new ActiveMQQueue("queue.reply");
    }

}

dependencies in pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-ftp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

application.properties

activemq.broker-url=vm://localhost

So I compiled the project using mvn clean package command then run the jar two times each one with different spring profile set
First:
java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=worker
Second:
java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=master

But both applications just close and no message receiving or sending. What I expected is when then worker app is started it will wait for any message, and when the master app is started it will send three message and in the worker will be triggered by the incoming message and print the message.

The following is the log of what happen I started the worker and the master app:

2023-10-26 15:05:45.494  INFO 12348 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "worker"
2023-10-26 15:05:45.843  INFO 12348 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:05:45.854  INFO 12348 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:05:47.156  INFO 12348 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.156  INFO 12348 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:05:47.157  INFO 12348 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:05:47.169  INFO 12348 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 1.989 seconds (JVM running for 2.403)
2023-10-26 15:05:47.176  INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.177  INFO 12348 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:05:47.178  INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'

=============================================================================================================================================================================

2023-10-26 15:06:16.995  INFO 12528 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "master"
2023-10-26 15:06:17.349  INFO 12528 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:06:17.360  INFO 12528 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:06:18.663  INFO 12528 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.664  INFO 12528 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:06:18.664  INFO 12528 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:06:18.673  INFO 12528 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 2.011 seconds (JVM running for 2.425)
2023-10-26 15:06:18.683  INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.683  INFO 12528 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:06:18.683  INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'

I am currently still learning spring-integration and only followed these sources:

  1. How to send Jms message from one spring-boot application to another when both apps use embedded activemq
  2. Spring Integration Samples - JMS

Any suggestion to achieve what I expected?

UPDATE

I did some changes as follows: In the application.properties:

activemq.broker-url=tcp://localhost:61616

I add the following bean in the CommonConfig:

    @Bean
    public BrokerService broker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        return broker;
    }

and the following dependency to the pom.xml:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-kahadb-store</artifactId>
        </dependency>

But still no communication happen:

2023-10-27 15:18:19.236  INFO 24904 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "worker"
2023-10-27 15:18:19.630  INFO 24904 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:18:19.643  INFO 24904 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:18:20.129  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:18:20.171  INFO 24904 --- [           main] o.a.a.store.kahadb.MessageDatabase       : KahaDB is version 7
2023-10-27 15:18:20.286  INFO 24904 --- [           main] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:18:20.415  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:18:20.420  INFO 24904 --- [           main] o.a.a.t.TransportServerThreadSupport     : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:18:20.421  INFO 24904 --- [           main] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:18:20.421  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:18:20.422  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : For help or more information please see: http://activemq.apache.org
2023-10-27 15:18:21.314  INFO 24904 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.314  INFO 24904 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:18:21.316  INFO 24904 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.331  INFO 24904 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 2.423 seconds (JVM running for 2.828)
2023-10-27 15:18:21.336  INFO 24904 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:18:21.337  INFO 24904 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:18:21.339  INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.339  INFO 24904 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:18:21.340  INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.341  INFO 24904 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:18:21.342  INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async queue tasks
2023-10-27 15:18:21.343  INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async topic tasks

=======================================================================================================

2023-10-27 15:20:13.451  INFO 17960 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "master"
2023-10-27 15:20:13.807  INFO 17960 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:20:13.820  INFO 17960 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:20:14.282  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:20:14.326  INFO 17960 --- [           main] o.a.a.store.kahadb.MessageDatabase       : KahaDB is version 7
2023-10-27 15:20:14.448  INFO 17960 --- [           main] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:20:14.583  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.a.a.t.TransportServerThreadSupport     : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : For help or more information please see: http://activemq.apache.org
2023-10-27 15:20:15.453  INFO 17960 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.454  INFO 17960 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:20:15.456  INFO 17960 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.469  INFO 17960 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 2.35 seconds (JVM running for 2.758)
2023-10-27 15:20:15.474  INFO 17960 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:20:15.475  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:20:15.476  INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.478  INFO 17960 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:20:15.479  INFO 17960 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:20:15.480  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async queue tasks
2023-10-27 15:20:15.481  INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.481  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async topic tasks
2023-10-27 15:20:15.484  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopped KahaDB

Upvotes: 0

Views: 443

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

The vm: transport is only available within the same JVM; you need to enable the tcp: transport.

See this answer: Is it possible to connect to spring boot embedded ActiveMQ instance from another application(started in separate process)?

Upvotes: 0

Related Questions