Reputation: 127
I'm looking for a way to conditionally handle messages based on the aggregation of messages. I've looked into a lot of ways to do this, but it seems that Apache Camel doesn't support it. I'll explain the scenario and then the solutions I tried.
Scenario: I'm trying to conditionally clean a directory. I poll from the directory every x days and fetch all the files (file://...). I route this into an aggregation, that aggregates the files into a single size (directorySize). I then check if this size passes a certain threshold.
Here is where the problem lies. I now want to remove certain files if this condition passes, but I don't have access to the original messages anymore because they were aggregated in a new exchange.
Solutions:
Things I consider doing:
I'm a bit stunned that you can't elegantly filter messages based on the aggregation of these messages. Is there something that I missed in Camel that would provide an elegant solution? Or is this a case of the least bad solution?
Message(File)
Message(File) --> AggregatedMessage(directorySize) --> delete certain Files?
Message(File)
Upvotes: 1
Views: 3063
Reputation: 40428
Camel is really awesome, but sometimes it's sure difficult to see exactly which design pattern to use ;)
Firstly, you need to keep a copy of the file objects, because you don't know whether to delete them or not until you reach your threshold - there are basically (at least) two ways to do this.
Alternative 1
The first way is to use a List
in an exchange property. This property will hang around no matter what you do with the exchange body. If you have a look at the source code for GroupedExchangeAggregationStrategy
, it does precisely this:
list = new ArrayList<Exchange>();
answer.setProperty(Exchange.GROUPED_EXCHANGE, list);
// ...
list.add(newExchange);
Or you could do the same thing manually on your own exchange property. In any case, it's completely fine to use the Grouped aggregation strategy as you have done.
Alternative 2
The second way to "keep" old messages is to send a copy to a stopped SEDA
queue. So you would do to("seda:xyz")
. You define this queue as .noAutoStartup()
. Then you can send messages to it and they will queue up on an internal queue, managed by camel. When you want to process the messages, you simply start it up via controlbus and stop it again afterwards.
Generally, messing around with starting and stopping queues should be avoided unless absolutely necessary, but that's certainly another way to do it
Suggested solution
I suggest you do as you have done (i.e. alternative 1):
aggregate
via GroupedExchangeAggregationStrategy to keep the individual files in a listfilter(simple("${body} < 123"))
splitter(simple("${property.CamelGroupedExchange}"))
Please let me know if this doesn'y makes sense, or if I have misunderstood your problem in any way.
Upvotes: 1