Reputation: 126
Currently I'm stuck with Downloading files from ftp server due to type conversion.
So the route mentioned below is being called from a pipeline which has my custom POJO as the body set already. And the custom pojo is as below :
public class DirectoryLocationListing {
private String domainName;
private String countryCode;
private String productName;
private String profileId;
private String directoryLocation;
And the route for sftp is as below wherein i'm fetching all files from a particular server's directory and keeping them on the local file path.
<route id="route5">
<from uri="direct:sftpGetCDRs"/>
<from uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${body.directoryLocation}/?noop=true&streamDownload=true&username={{USER}}&password={{PASSWD}}"/>
<to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&autoCreate=true"/>
</route>
While running the app i'm facing the exception below which i feel i shouldn't be facing as the from entry is supposed to pick up files and keep it in the to section, however the to section where i'm putting file it seems like the body taken at the route is being referred instead of the files taken from ftp server.
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[_route1 ] [_route1 ] [quartz2://spring?cron=0+*+*+%3F+*+* ] [ 121]
[_route2 ] [_choice1 ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
null] [ 6]
[_route2 ] [_to4 ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc ] [ 1]
[_route2 ] [_log6 ] [log ] [ 1]
[_route2 ] [_to4 ] [direct:sftpGetCDRs ] [ 3]
[route5 ] [to1 ] [file://C:/example/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [ 2]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-IGL70051-59483-1550660613598-0-7
ExchangePattern InOnly
Headers {breadcrumbId=ID-IGL70051-59483-1550660613598-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, fireTime=Wed Feb 20 16:34:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@65314fe3, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Wed Feb 20 16:35:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Wed Feb 20 16:34:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@59c9a1bf, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Wed Feb 20 16:35:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
BodyType com.example.cdr.model.DirectoryLocationListing
Body com.example.cdr.model.DirectoryLocationListing@23a2ecf4
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp\IN\healthyindia\apj\SERVICEID
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:292)
at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:590)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:518)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:227)
at org.apache.camel.processor.Splitter.process(Splitter.java:104)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:736)
at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:83)
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:289)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2 of type: com.EXAMPLE.cdr.model.DirectoryLocationListing on: Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Caused by: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Exchange[Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:101)
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:273)
... 52 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:177)
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:99)
Please guide in case somebody has experienced same issue. So Type conversion might be required but i don't see that it is needed in this case.
**Update **
After using pollenrich my routes are now as below :
<route id="route5">
<from uri="direct:sftpGetCDRs"/>
<process ref="sftpGetDirLocation"/>
<log message="property ${exchangeProperty.ftpGetDirectory}"/>
<pollEnrich uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${exchangeProperty.ftpGetDirectory}/?noop=true&streamDownload=true&username={{USER}}&password={{PASSWD}}" timeout="0"/>
<to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&autoCreate=true"/>
</route>
And the processor that i'm referring to here is
@Component("sftpGetDirLocation")
public class SFTPGetDirLocation implements Processor{
public void process(Exchange exchange) throws Exception {
exchange.setProperty("ftpGetDirectory", exchange.getIn().getBody(DirectoryLocationListing.class).getDirectoryLocation());
}
}
However My assumption after this change was pollEnrich will keep whatever(presumably files) it got from URI for sftp and keep it as body. Instead i see the body is null
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[_route1 ] [_route1 ] [quartz2://spring?cron=0+*+*+%3F+*+* ] [ 657]
[_route2 ] [_choice1 ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
null] [ 572]
[_route2 ] [_to4 ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc ] [ 2]
[_route2 ] [_log6 ] [log ] [ 7]
[_route2 ] [_to4 ] [direct:sftpGetCDRs ] [ 523]
[route5 ] [process1 ] [ref:sftpGetDirLocation ] [ 0]
[route5 ] [log1 ] [log ] [ 1]
[route5 ] [pollEnrich1 ] [pollEnrich[sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/${exchangeP] [ 514]
[route5 ] [log2 ] [log ] [ 0]
[route5 ] [to1 ] [file://C:/EXAMPLE/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [ 0]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-IGL70051-57920-1550721464331-0-4
ExchangePattern InOnly
Headers {breadcrumbId=ID-IGL70051-57920-1550721464331-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, CamelToEndpoint=sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/$%7BexchangeProperty.ftpGetDirectory%7D/?noop=true&password=tcpip123&streamDownload=true&username=admin, fireTime=Thu Feb 21 09:28:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@363935fd, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Thu Feb 21 09:29:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Thu Feb 21 09:28:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@5bf7fd08, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Thu Feb 21 09:29:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
BodyType null
Body [Body is null]
]
And thus the exception is
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot write null body to file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:205)
Upvotes: 2
Views: 6028
Reputation: 126
In the end the following camel context route worked for me with guidance of shelldragon :
<route id="route5">
<from id="_from5" uri="direct:sftpGetCDRs"/>
<process id="_process1" ref="sftpGetDirLocation"/>
<log id="_log17" message="property ${exchangeProperty.ftpGetDirectory}"/>
<pollEnrich id="_pollEnrich1" timeout="0" uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/temp/${body.directoryLocation}/?consumer.delay=60000&username={{USER}}&password={{PASSWD}}"/>
<log id="_log18" message="${body}"/>
<to id="_to2" uri="file://{{DB_DIR_LOC}}/temp/?fileName=${exchangeProperty.ftpGetDirectory}&autoCreate=true"/>
</route>
Upvotes: 0
Reputation: 1722
There are three problems in the code as I see it.
You are sending an exchange to route5
with body of type DirectoryLocationListing
and you expect the sftp
component to use that message with a second from
definition. It worn't work. When the SFTP
component downloads files, body on the exchange will be replaced by the real file (of type org.apache.camel.component.file.GenericFile
) and you'll lose the DirectoryLocationListing
object you had in the previous exchnage. Content Enriher EIP is a good option if your pojo cam be set elsewhere, other than the body.
In the current situation, at the end of the route you expect a file from the sftp
endpoint but what is actually being delivered to you is an exchange with body of type DirectoryLocationListing
(Your SFTP component did not work). There's no way for Camel to do that magic and persist it to disk. That exactly is what it is complaining about.
2.1. Once you fix this issue(and your SFTP component starts doing its job), you'll run into the next situation that the message being deliveted to your to
endpoint is now a file, you need to somehow keep directoryLocation
in the exchange till the end of the route.
fileName
parameter in your to
definition is pointing to a directory. It won't work. You'll have to use a dynamic endpoint definition
I am not familiar with Camel's XML DSL. So I'll show how this can be done with Java DSL. I'll avoid SFTP for brewity and use a file
component instead. We'll try to fix all these issues in one go.
Route Definition
from("direct:test")
.routeId("route5")
.log("=> body.directoryLocation is: ${body.directoryLocation}") //Just to see if we can read the directoryLocation property from the POJO
.process(exchange -> exchange.setProperty("WriteTargetDirectory",exchange.getIn().getBody(DirectoryListing.class).directoryLocation)) //Move directoryLocation property to an exchange property named WriteTargetDirectory
.pollEnrich()
.simple("file://source/${exchangeProperty.WriteTargetDirectory}/?noop=true")//Poll enrich magic here!
.log("=> I still have the target directory location as : ${exchangeProperty.WriteTargetDirectory}")
.log("=> I just received file [${in.header." + Exchange.FILE_NAME + "}] and will write it as [${exchangeProperty.WriteTargetDirectory}/${in.header." + Exchange.FILE_NAME + "}]" )
.toD("file://destination/${exchangeProperty.WriteTargetDirectory}/");
DirectoryListing
class
public class DirectoryListing {
String directoryLocation;
public DirectoryListing(String directoryLocation) {
this.directoryLocation = directoryLocation;
}
public String getDirectoryLocation() {
return directoryLocation;
}
}
Route test code
template.sendBody("direct:test",new DirectoryListing("IN/healthyindia/"));
Log from the test
[ main] DefaultCamelContext INFO Apache Camel 2.23.1 (CamelContext: camel-1) started in 0.203 seconds
[ main] route5 INFO => body.directoryLocation is: IN/healthyindia/
[ main] FileEndpoint INFO Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[ main] FileEndpoint INFO Using default memory based idempotent repository with cache max size: 1000
[ main] route5 INFO => I still have the target directory location as : IN/healthyindia/
[ main] route5 INFO => I just received file [test.txt] and will write it as [destination/IN/healthyindia//test.txt]
[ main] MainRouteBuilderTest INFO ********************************************************************************
[ main] MainRouteBuilderTest INFO Testing done: testRouteBuilder(JustTest.MainRouteBuilderTest)
[ main] MainRouteBuilderTest INFO Took: 0.125 seconds (125 millis)
Upvotes: 2