Reputation: 2848
I am processing large files with Camel. The idea is to split the source file by lines and send each line as an individual message to an Akka component that processes each line as an independent message. The Akka component ends up outputting a file using another route (but this is not relevant to this question).
The Akka component may experience errors when processing a certain line and in this case I want to signal the File consumer Camel component, stop reading the file and move it to another location. Since the files can be large (up to 500 mb of text) I am using the streaming mode and a throttling component and I would need to signal the error from the Akka component and have it trigger the moveFailed option in the File consumer, however I can't get this to work.
Here is my route:
<route id="splitFile" >
<from uri="file:C:/tmp/in_test?preMove=staging&move=.done&moveFailed=.error&readLock=changed&readLockCheckInterval=1500"/>
<split streaming="true" stopOnException="true" shareUnitOfWork="true" >
<tokenize token="\r\n" group="1"/>
<setHeader headerName="CamelSplitIndex"> <!-- line number basically -->
<simple>property.CamelSplitIndex</simple>
</setHeader>
<setHeader headerName="CamelSplitComplete"> <!-- did I read the last line? -->
<simple>property.CamelSplitComplete</simple>
</setHeader>
<throttle timePeriodMillis="1000">
<constant>850</constant>
<to uri="vm:PROCESSOR_RECEIVER"/>
</throttle>
</split>
</route>
Then in Akka when I have an error I'm reporting it (following these guidelines at "Delivery Acknowledgement") like this:
What I'm seeing is, I get a detailed log from o.a.c.p.DefaultErrorHandler "Failed delivery for (MessageId: ... on ExchangeId: ... ) which includes a "Message History" and a detail of information on the Exchange. So my akka.actor.Status.Failure message does seem to get to the Camel runtime but the input file does not stop processing, i.e., all the content from the input file is sent to my vm:PROCESSOR_RECEIVER component and the file is eventually moved to staging.done.
Again what I want to happen is that the component stops sending content to vm:PROCESSOR_RECEIVER and the file gets moved to staging.error.
I've tried variations on the route, including specifying consumer.bridgeErrorHandler=true on the File URI options, adding an onException tag on the route declaration. But I haven't been able to get my desired behaviour.
To sum up the question is: How can I stop streaming content and trigger the moveFailed option in the File consumer from the destination of the route? (particularly when the destination is an Akka Camel component).
Upvotes: 0
Views: 1031
Reputation: 2848
It turns out that the problem lies in using a vm: component to communicate with Akka. Since vm: is asynchronous, it doesn't propagate the exception "upwards". Simply changing the component to direct: makes it work, i.e., instead of
<to uri="vm:PROCESSOR_RECEIVER"/>
change to
<to uri="direct:PROCESSOR_RECEIVER"/>
Upvotes: 1