user_1234567_java
user_1234567_java

Reputation: 53

Spring Batch and Spring Integration. Can not configure JobListener

I'm new in Spring. Recently I do try to make spring batch and spring integration work together. I want to have JobListener which will listen for message comes to specific channel and launch Spring Batch Job.

I found example on github(https://github.com/chrisjs/spring-batch-scaling/tree/master/message-job-launch) and I tried to configure some way copy Spring Batch and Spring Integration together and this looks like:

<!--Incomming channel OneToOne-->
<int:channel id="requests-channel"/>

<!--For multiple consumers OneToMany-->
<int:publish-subscribe-channel id="reply-channel"/>

<!--Channel for file adapter-->
<int:channel id="file-adapter-reply-channel"/>

<int:channel id="statuses">
    <int:queue capacity="10"/>
</int:channel>


<int:channel id="jobLaunchReplyChannel"/>


<!--Intercept request-->
<int-http:inbound-gateway request-channel="requests-channel"
                          supported-methods="PUT"
                          path="/testData/setProfileDescription"
                          reply-timeout="30000"
                          reply-channel="reply-channel">
</int-http:inbound-gateway>


<!--Sending HTTP response back to user OR either 'no reply received within timeout'-->
<bean id="profileDescriptionActivator"
      class="ru.tcsbank.service.integrations.activators.ProfileDescriptionActivator"/>

<int:service-activator ref="profileDescriptionActivator"
                       input-channel="requests-channel"
                       output-channel="reply-channel"
                       method="httpMessageActivator"/>


<!--Write profile description to file-->
<bean id="custom-file-name-generator"
      class="ru.tcsbank.service.integrations.transformers_generators.ProfilesFileAdapterNameGenerator"/>
<file:outbound-channel-adapter channel="file-adapter-reply-channel"
                               directory="file:out"
                               filename-generator="custom-file-name-generator"/>


<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" lazy-init="true" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/testdb"/>
    <property name="username" value="test_user"/>
    <property name="password" value="qwerty123"/>
</bean>


<bean id="stepScope" class="org.springframework.batch.core.scope.StepScope">
    <property name="autoProxy" value="true"/>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource"/>
</bean>

<bean id="jobRepositoryInDB" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="dataSource" ref="dataSource"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>


<bean id="itemProcessor" class="ru.tcsbank.service.batch_processing.CustomItemProcessor"/>

<bean id="itemReader" class="ru.tcsbank.service.batch_processing.CustomReader" scope="step">
    <property name="resource" value="classpath:fileOut/*.csv" />
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value=","/>
                    <property name="names" value="id,firstName,lastName"/>
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="ru.tcsbank.service.batch_processing.ProfileDescriptionLineMapper"/>
            </property>
        </bean>
    </property>
</bean>
<bean id="itemWriter" class="ru.tcsbank.service.batch_processing.CustomWriter"/>

<batch:job id="helloWorldJob" job-repository="jobRepositoryInDB">
    <batch:listeners>
        <batch:listener ref="jobListener"/>
    </batch:listeners>
    <batch:step id="step1">
        <batch:tasklet>
            <batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>
</batch:job>


<int:transformer input-channel="reply-channel" output-channel="file-adapter-reply-channel">
    <bean class="ru.tcsbank.service.batch_processing.FileMessageToJobRequest">
        <property name="job" ref="helloWorldJob"/>
        <property name="fileParameterName" value="input.file.name"/>
    </bean>
</int:transformer>


<bean id="jobListener" class="ru.tcsbank.service.batch_processing.CustomJobExecutionListener">
    <constructor-arg index="0" ref="notificationSender"/>
</bean>

<batch-int:job-launching-gateway request-channel="reply-channel"
                                 reply-channel="file-adapter-reply-channel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

<int:channel id="notificationsChannel"/>
<int:gateway id="notificationSender"
             service-interface="ru.tcsbank.service.batch_processing.NotificationSender"
             default-request-channel="notificationsChannel"/>

I expect my helloWorldJob run when(as I understand correctly) my jobListener receives message from notificationsChannel. But it do not work(do not receives message from notificationsChannel) Beyond then it throws error like:

Dispatcher has no subscribers for channel 'application.notificationsChannel'.; nested exception is >org.springframework.integration.MessageDispatchingException: Dispatcher >has no subscribers, failedMessage=GenericMessage [payload=TEST. >Image processing job ran for: 0 minutes, 0 seconds.

Upvotes: 1

Views: 394

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

It's hard to understand what you would like to achieve with all this custom code, but what I can say, that there is no subscribers for that notificationsChannel in your configuration. You indeed send messages to it via notificationSender gateway, but you don't provide any endpoint to consume that notificationsChannel.

In the sample you mention in the link we have something like this:

<int-jms:outbound-channel-adapter id="notifications" destination-name="notifications"
              channel="notificationsChannel"/>

So, messages sent to the notificationsChannel are landed in the notifications queue on JMS broker. Your sample are leaking such a subscriber. Therefore I only can explain a reason of the exception, but definitely can't tell you what do to.

UPDATE

You may not use notificationSender in your solution. Looks like it just a result of the CustomJobExecutionListener. So, if you don't need to listen for job process, just remove that CustomJobExecutionListener and, therefore, this notificationSender declaration together with the notificationsChannel definition.

Everything else you are asking in comments is out of the scope of this SO question. Please, consider to raise those concerns in the separate SO thread.

Upvotes: 1

Related Questions