Frederic Close
Frederic Close

Reputation: 9639

Throttling based on content

I would like to know if it's possible with Camel to do throttling based on the content of the exchange.

The situation is the following: I have to call a webservice via soap. Among, the parameters sent to that webservice there is a customerId. The problem is that the webservice send back an error if there are more than 1 request per minute for a given customerId.

I'm wondering if it would be possible to implement throttling per customerId with Camel. So the throttling should not be implemented for all messages but only for messages with the same customerId.

Let me know how I could implement this or if I need to clarify my question.

Upvotes: 7

Views: 3179

Answers (3)

Mike Klimentiev
Mike Klimentiev

Reputation: 71

I came across a similar problem and finally came up with the solution described here.

My assumptions are:

  • Order of messages is not important (though it can be solved by re-sequencer)
  • Total volume of messages per customer ID is not great so the runtime is not saturated.

The solution approach:

  • Run aggregator for 1 minute while using customerID to assemble messages with the same customer ID into a list
  • Use Splitter to split the list into individual messages
  • Send the first message from the splitter to the actual service
  • Re-route the rest of the list back into the aggregator.

Java DSL version is a bit easier to understand:

final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class)
        .accumulateInCollection(ArrayList.class);

from("direct:start")
    .log("Receiving ${body}")
    .aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000)
        .log("Aggregate: releasing ${body}")
        .split(body())
        .choice()
            .when(header(Exchange.SPLIT_INDEX).isEqualTo(0))
                .log("*** Processing: ${body}")
                .to("mock:result")
            .otherwise()
              .to("seda:delay")
        .endChoice();

from("seda:delay")
    .delay(0)
    .to("direct:start");

Spring XML version looks like the following:

 <!-- this is our aggregation strategy defined as a spring bean -->
 <!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr -->
 <bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/>
 <bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection">
     <constructor-arg value="java.util.ArrayList" />
 </bean>

<camelContext xmlns="http://camel.apache.org/schema/spring">
       <route>
           <from uri="direct:start"/>
           <log message="Receiving ${body}"/>
           <aggregate strategyRef="_flexible2" completionTimeout="60000" >
               <correlationExpression>
                   <xpath>/order/@customerID</xpath>
               </correlationExpression>
               <log message="Aggregate: releasing ${body}"/>
               <split>
                   <simple>${body}</simple>
                   <choice>
                       <when>
                           <simple>${header.CamelSplitIndex} == 0</simple>
                           <log message="*** Processing: ${body}"/>
                           <to uri="mock:result"/>
                       </when>
                       <otherwise>
                           <log message="--- Delaying: ${body}"/>
                           <to uri="seda:delay" />
                       </otherwise>
                   </choice>
               </split>
           </aggregate>
       </route>

       <route>
           <from uri="seda:delay"/>
           <to uri="direct:start"/>
       </route>
</camelContext>

Upvotes: 2

David Gordon
David Gordon

Reputation: 21

While ActiveMQ Message Groups would definitely address the parallel processing of unique customer ID's, in my assessment Claus is correct that introducing a throttle for each unique group represents an unimplemented feature for Camel/ActiveMQ.

Message Groups alone will not meet the SLA described. While each group of messages (correlated by the customer ID) will be processed in order with one thread per group, as long as requests take less than a minute to receive a response, the requirement of one request per minute per customer would not be enforced.

That said, I would be very interested to know if it would be possible to combine Message Groups and a throttle strategy in a way that would simulate the feature request in JIRA. My attempts so far have failed. I was thinking something along these lines:

<route>
  <from uri="activemq:pending?maxConcurrentConsumers=10"/>
  <throttle timePeriodMillis="60000">
    <constant>1</constant>
    <to uri="mock:endpoint"/>
  </throttle>
</route>

However, the throttle seems to be applied to the entire set of requests moving to the endpoint, and not to each individual consumer. I have to admit, I was a bit surprised to find that behavior. My expectation was that the throttle would apply to each consumer individually, which would satisfy the SLA in the original question, provided that the messages include the customer ID in the JMSXGroupId header.

Upvotes: 2

Ben ODay
Ben ODay

Reputation: 21015

ActiveMQ Message Groups is designed to handle this case. So, if you can introduce a JMS queue hop in your route, then just set the JMSXGroupId header to the customerId. Then in another route, you can consume from this queue and send to your web service to get the behavior you described.

also see http://camel.apache.org/parallel-processing-and-ordering.html for more information...

Upvotes: 2

Related Questions