David Rodrigues
David Rodrigues

Reputation: 622

RX design - Throttling hits on external systems

I am writing a message broker that taps onto IBM MQs and folders on the file system. After picking up messages it then materializes them into strongly typed classes and plugs them into RX Subjects.

I have built awareness on the messages that allow me to identify which external systems need to be hit to handle them, so I can do queries on the RX observables and pick messages that do not target an external system, etc.

What I want to do next, is throttle messages by external system being hit, for example:

If I was hitting a CRM system with a certain type of message, and I would decide that I wanted to hit that system with 4 concurrent calls max, I would be handling 4 messages at a time only, if I had a 5th message I would have to wait for one of the previous 4 to be done with and then move on to the 5th. The same for other types of resources like external databases, other external web services, etc.

I have started researching on this matter, and so far the best design approach would be to write my own scheduler. The downside is that I would have to write my own internal structures that queue up messages inside the scheduler, after they are picked up, and this is where I dislike this approach.

Does anyone have a better way of doing this?

Upvotes: 1

Views: 100

Answers (3)

David Rodrigues
David Rodrigues

Reputation: 622

I also posted this same question on MSDN and got a more in depth answer with a different implementation of the Merge operator so that data loss wouldn't occur when the max concurrency values changed.

Upvotes: 0

Brandon
Brandon

Reputation: 39182

What you are describing seems to be max concurrency. The Merge operator has support for stuff like that.

You'd need to use something like GroupBy to split your stream based upon where it is going, then use Merge with a maximum concurrency on each split piece, then finally Merge the results back together. Something like this:

IObservable<T> requests = ...;
requests.GroupBy(request => PickExternalSystem(request))
    .Select(group => group // group.Key is the TExternalSystem
        .Select(request => Observable.Defer(() => group.Key.ExecuteAsync(request)))
        .Merge(maxConcurrency: group.Key.MaxConcurrency))
    .Merge() // merge the results of each group back together again
    .Subscribe(result => ...);

Upvotes: 1

Jim Wooley
Jim Wooley

Reputation: 10398

You may want to look into ReactiveUI which includes a rate throttling mechanism for service requests. See http://blog.paulbetts.org/index.php/2011/01/15/reactivexaml-is-now-reactiveui-2-0/

Upvotes: 0

Related Questions