Reputation: 3356
I have a requirement to receive zip file using SFTP. Upon we should archive the file as it is, also processes the file after unzipping the zip files. Below are the codes for the main flow and subflow. agentDataArchiveChannelAdapter() is working fine but for the other channel I am getting below error. What could be the error how this can be fixed? My assumption was surancebayAgentDemographicFlow() will put records into the direct channel and it will follow the process as stated.
Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
2018-09-24 12:16:22.004 DEBUG 17536 --- [ask-scheduler-2] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.thirdpartyAgentDemographicFlow-Processing'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], failedMessage=GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}], headers={id=9e342354-8436-e1de-774e-937c8b6809d5, timestamp=1537816582001}] for original GenericMessage [payload=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, headers={file_originalFile=C:\thirdparty\input\thirdpartyAL_20180921_215000.zip, id=1a721c73-92fe-cabc-c8a3-cb71c72ab07d, file_name=thirdpartyAL_20180921_215000.zip, file_relativePath=thirdpartyAL_20180921_215000.zip, timestamp=1537816555968}]
Code/Integration flow
@Bean("sftpAgentInboundFlow")
public IntegrationFlow sftpAgentInboundFlow(SessionFactory<LsEntry> sftpSessionFactory) {
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory)
.deleteRemoteFiles(false)
.preserveTimestamp(true)
.remoteDirectory(agentRemoteDir)
.filter(new AcceptOnceFileListFilter<>())
.regexFilter(".*\\.zip$")
.localDirectory(new File(inputDir))
.autoCreateLocalDirectory(true)
.maxFetchSize(1)
,
consumer -> consumer.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
.publishSubscribeChannel(pubSub -> pubSub
.id("AgentInboundDemographic-PubSub-Channel")
.subscribe(flow -> flow.bridge(e -> e.id("ziparchiver")).handle(agentDataArchiveChannelAdapter()))
.subscribe(surancebayAgentDemographicFlow())
)
.get();
}
//@Bean("surancebayAgentDemographicFlow")
public IntegrationFlow surancebayAgentDemographicFlow() {
return IntegrationFlows
//.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)/*.maxMessagesPerPoll(corepoolsize)*/))
.from(MessageChannels.direct("thirdpartyAgentDemographicFlow-Processing"))
.transform(unZipTransformer())
.split(splitter())
.channel(MessageChannels.executor(taskExecutor()))
.<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
.aggregate(aggregator -> aggregator.groupTimeout(messageGroupWaiting).correlationStrategy(new CorrelationStrategy() {
@Override
public Object getCorrelationKey(Message<?> message) {
return "processdate";
}
}).sendPartialResultOnExpiry(true))
.handle("agentDemograpicOutput","generateAgentDemographicFile")
.channel(confirmChannel())
.get()
;
}
Upvotes: 0
Views: 1239
Reputation: 121272
OK! I think the problem that you use some Spring Integration version where the feature to use an external IntegrationFlow
as a sub-flow is not implemented yet. Or consider to upgrade to the latest version or use a workaround as a .subscribe("thirdpartyAgentDemographicFlow-Processing")
and uncomment that @Bean
annotation on the surancebayAgentDemographicFlow
definition.
Upvotes: 1