Reputation: 53
I'm new in Spring. Recently I do try to make spring batch and spring integration work together. I want to have JobListener which will listen for message comes to specific channel and launch Spring Batch Job.
I found example on github(https://github.com/chrisjs/spring-batch-scaling/tree/master/message-job-launch) and I tried to configure some way copy Spring Batch and Spring Integration together and this looks like:
<!--Incomming channel OneToOne-->
<int:channel id="requests-channel"/>
<!--For multiple consumers OneToMany-->
<int:publish-subscribe-channel id="reply-channel"/>
<!--Channel for file adapter-->
<int:channel id="file-adapter-reply-channel"/>
<int:channel id="statuses">
<int:queue capacity="10"/>
</int:channel>
<int:channel id="jobLaunchReplyChannel"/>
<!--Intercept request-->
<int-http:inbound-gateway request-channel="requests-channel"
supported-methods="PUT"
path="/testData/setProfileDescription"
reply-timeout="30000"
reply-channel="reply-channel">
</int-http:inbound-gateway>
<!--Sending HTTP response back to user OR either 'no reply received within timeout'-->
<bean id="profileDescriptionActivator"
class="ru.tcsbank.service.integrations.activators.ProfileDescriptionActivator"/>
<int:service-activator ref="profileDescriptionActivator"
input-channel="requests-channel"
output-channel="reply-channel"
method="httpMessageActivator"/>
<!--Write profile description to file-->
<bean id="custom-file-name-generator"
class="ru.tcsbank.service.integrations.transformers_generators.ProfilesFileAdapterNameGenerator"/>
<file:outbound-channel-adapter channel="file-adapter-reply-channel"
directory="file:out"
filename-generator="custom-file-name-generator"/>
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" lazy-init="true" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/testdb"/>
<property name="username" value="test_user"/>
<property name="password" value="qwerty123"/>
</bean>
<bean id="stepScope" class="org.springframework.batch.core.scope.StepScope">
<property name="autoProxy" value="true"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="jobRepositoryInDB" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="itemProcessor" class="ru.tcsbank.service.batch_processing.CustomItemProcessor"/>
<bean id="itemReader" class="ru.tcsbank.service.batch_processing.CustomReader" scope="step">
<property name="resource" value="classpath:fileOut/*.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value=","/>
<property name="names" value="id,firstName,lastName"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="ru.tcsbank.service.batch_processing.ProfileDescriptionLineMapper"/>
</property>
</bean>
</property>
</bean>
<bean id="itemWriter" class="ru.tcsbank.service.batch_processing.CustomWriter"/>
<batch:job id="helloWorldJob" job-repository="jobRepositoryInDB">
<batch:listeners>
<batch:listener ref="jobListener"/>
</batch:listeners>
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter" processor="itemProcessor" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
<int:transformer input-channel="reply-channel" output-channel="file-adapter-reply-channel">
<bean class="ru.tcsbank.service.batch_processing.FileMessageToJobRequest">
<property name="job" ref="helloWorldJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<bean id="jobListener" class="ru.tcsbank.service.batch_processing.CustomJobExecutionListener">
<constructor-arg index="0" ref="notificationSender"/>
</bean>
<batch-int:job-launching-gateway request-channel="reply-channel"
reply-channel="file-adapter-reply-channel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
<int:channel id="notificationsChannel"/>
<int:gateway id="notificationSender"
service-interface="ru.tcsbank.service.batch_processing.NotificationSender"
default-request-channel="notificationsChannel"/>
I expect my helloWorldJob
run when(as I understand correctly) my jobListener
receives message from notificationsChannel
. But it do not work(do not receives message from notificationsChannel
) Beyond then it throws error like:
Dispatcher has no subscribers for channel 'application.notificationsChannel'.; nested exception is >org.springframework.integration.MessageDispatchingException: Dispatcher >has no subscribers, failedMessage=GenericMessage [payload=TEST. >Image processing job ran for: 0 minutes, 0 seconds.
Upvotes: 1
Views: 394
Reputation: 121177
It's hard to understand what you would like to achieve with all this custom code, but what I can say, that there is no subscribers for that notificationsChannel
in your configuration. You indeed send messages to it via notificationSender
gateway, but you don't provide any endpoint to consume that notificationsChannel
.
In the sample you mention in the link we have something like this:
<int-jms:outbound-channel-adapter id="notifications" destination-name="notifications"
channel="notificationsChannel"/>
So, messages sent to the notificationsChannel
are landed in the notifications
queue on JMS broker. Your sample are leaking such a subscriber. Therefore I only can explain a reason of the exception, but definitely can't tell you what do to.
UPDATE
You may not use notificationSender
in your solution. Looks like it just a result of the CustomJobExecutionListener
. So, if you don't need to listen for job process, just remove that CustomJobExecutionListener
and, therefore, this notificationSender
declaration together with the notificationsChannel
definition.
Everything else you are asking in comments is out of the scope of this SO question. Please, consider to raise those concerns in the separate SO thread.
Upvotes: 1