Reputation: 622
I am writing a message broker that taps onto IBM MQs and folders on the file system. After picking up messages it then materializes them into strongly typed classes and plugs them into RX Subjects.
I have built awareness on the messages that allow me to identify which external systems need to be hit to handle them, so I can do queries on the RX observables and pick messages that do not target an external system, etc.
What I want to do next, is throttle messages by external system being hit, for example:
If I was hitting a CRM system with a certain type of message, and I would decide that I wanted to hit that system with 4 concurrent calls max, I would be handling 4 messages at a time only, if I had a 5th message I would have to wait for one of the previous 4 to be done with and then move on to the 5th. The same for other types of resources like external databases, other external web services, etc.
I have started researching on this matter, and so far the best design approach would be to write my own scheduler. The downside is that I would have to write my own internal structures that queue up messages inside the scheduler, after they are picked up, and this is where I dislike this approach.
Does anyone have a better way of doing this?
Upvotes: 1
Views: 100
Reputation: 622
I also posted this same question on MSDN and got a more in depth answer with a different implementation of the Merge operator so that data loss wouldn't occur when the max concurrency values changed.
Upvotes: 0
Reputation: 39182
What you are describing seems to be max concurrency. The Merge
operator has support for stuff like that.
You'd need to use something like GroupBy
to split your stream based upon where it is going, then use Merge
with a maximum concurrency on each split piece, then finally Merge
the results back together. Something like this:
IObservable<T> requests = ...;
requests.GroupBy(request => PickExternalSystem(request))
.Select(group => group // group.Key is the TExternalSystem
.Select(request => Observable.Defer(() => group.Key.ExecuteAsync(request)))
.Merge(maxConcurrency: group.Key.MaxConcurrency))
.Merge() // merge the results of each group back together again
.Subscribe(result => ...);
Upvotes: 1
Reputation: 10398
You may want to look into ReactiveUI which includes a rate throttling mechanism for service requests. See http://blog.paulbetts.org/index.php/2011/01/15/reactivexaml-is-now-reactiveui-2-0/
Upvotes: 0