Reputation: 9639
I would like to know if it's possible with Camel to do throttling based on the content of the exchange.
The situation is the following: I have to call a webservice via soap. Among, the parameters sent to that webservice there is a customerId. The problem is that the webservice send back an error if there are more than 1 request per minute for a given customerId.
I'm wondering if it would be possible to implement throttling per customerId with Camel. So the throttling should not be implemented for all messages but only for messages with the same customerId.
Let me know how I could implement this or if I need to clarify my question.
Upvotes: 7
Views: 3179
Reputation: 71
I came across a similar problem and finally came up with the solution described here.
My assumptions are:
The solution approach:
Java DSL version is a bit easier to understand:
final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class)
.accumulateInCollection(ArrayList.class);
from("direct:start")
.log("Receiving ${body}")
.aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000)
.log("Aggregate: releasing ${body}")
.split(body())
.choice()
.when(header(Exchange.SPLIT_INDEX).isEqualTo(0))
.log("*** Processing: ${body}")
.to("mock:result")
.otherwise()
.to("seda:delay")
.endChoice();
from("seda:delay")
.delay(0)
.to("direct:start");
Spring XML version looks like the following:
<!-- this is our aggregation strategy defined as a spring bean -->
<!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr -->
<bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/>
<bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection">
<constructor-arg value="java.util.ArrayList" />
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<log message="Receiving ${body}"/>
<aggregate strategyRef="_flexible2" completionTimeout="60000" >
<correlationExpression>
<xpath>/order/@customerID</xpath>
</correlationExpression>
<log message="Aggregate: releasing ${body}"/>
<split>
<simple>${body}</simple>
<choice>
<when>
<simple>${header.CamelSplitIndex} == 0</simple>
<log message="*** Processing: ${body}"/>
<to uri="mock:result"/>
</when>
<otherwise>
<log message="--- Delaying: ${body}"/>
<to uri="seda:delay" />
</otherwise>
</choice>
</split>
</aggregate>
</route>
<route>
<from uri="seda:delay"/>
<to uri="direct:start"/>
</route>
</camelContext>
Upvotes: 2
Reputation: 21
While ActiveMQ Message Groups would definitely address the parallel processing of unique customer ID's, in my assessment Claus is correct that introducing a throttle for each unique group represents an unimplemented feature for Camel/ActiveMQ.
Message Groups alone will not meet the SLA described. While each group of messages (correlated by the customer ID) will be processed in order with one thread per group, as long as requests take less than a minute to receive a response, the requirement of one request per minute per customer would not be enforced.
That said, I would be very interested to know if it would be possible to combine Message Groups and a throttle strategy in a way that would simulate the feature request in JIRA. My attempts so far have failed. I was thinking something along these lines:
<route>
<from uri="activemq:pending?maxConcurrentConsumers=10"/>
<throttle timePeriodMillis="60000">
<constant>1</constant>
<to uri="mock:endpoint"/>
</throttle>
</route>
However, the throttle seems to be applied to the entire set of requests moving to the endpoint, and not to each individual consumer. I have to admit, I was a bit surprised to find that behavior. My expectation was that the throttle would apply to each consumer individually, which would satisfy the SLA in the original question, provided that the messages include the customer ID in the JMSXGroupId header.
Upvotes: 2
Reputation: 21015
ActiveMQ Message Groups is designed to handle this case. So, if you can introduce a JMS queue hop in your route, then just set the JMSXGroupId header to the customerId. Then in another route, you can consume from this queue and send to your web service to get the behavior you described.
also see http://camel.apache.org/parallel-processing-and-ordering.html for more information...
Upvotes: 2