Reputation: 233
I have implemented Spring Batch + Spring Integration combination with DSL referring to following link
Here is the code.
@Bean
public IntegrationFlow integrationFlowUi(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDirUi)).
filter(new SimplePatternFileListFilter("*.csv")).
filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
channel("bridgeChannel").
handle(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
with Batch config as
@Bean
public Job fundingCardActivationJob() {
return jobBuilderFactory.get("fundingCardActivationJob")
.incrementer(new RunIdIncrementer())
.flow(activationStep())
.next(fileRenamingStep())
.end()
.build();
}
Job runs. But At the end it throws following exception.
2017-12-15 12:39:54.867 WARN 13723 --- [ask-scheduler-1] headers.id + ': ' + payload : GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}]
2017-12-15 12:39:54.900 ERROR 13723 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.integrationFlowUi.channel#2'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}], failedMessage=GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ... 52 more
I have no idea why the exception is being thrown when integration configuration seems complete.
Upvotes: 0
Views: 331
Reputation: 174494
Something is closing (or stopping) the application context before the JobExecution
result is sent to the .log()
.
Upvotes: 0