Reputation: 26566
I have the source of the messages, which is an Observable
. For every message I would like to make an HTTP call which will produce another Observable
, so I combine them together with the flatMap
and then sink them to some subscriber. Here the code of this scenario:
Rx.Observable.interval(1000)
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.subscribe (result) ->
console.info "Processed: ", result
this example is written in coffeescript, but I think the problem statement would be valid for any other Rx implementation.
The issue I have with this approach is that loadMessages
produces a lot of messages very quickly. This means, that I make a lot of HTTP requests in a very short period of time. This is not acceptable in my situation, so I would like to limit amount of the parallel HTTP requests to 10 or so. In other words I would like to throttle the pipelene or apply some kind of backpresure, when I making HTTP requests.
Is there any standard approach or best practices for the Rx to deal with this kind of situations?
Currently I implemented very simple (and pretty suboptimal) backpresure mechanism, that ignores tick if system has too many massages in processing. It looks like this (simplified version):
Rx.Observable.interval(1000)
.filter (tick) ->
stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
stats.messageIn()
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.do (tick) ->
stats.messageOut()
.subscribe (result) ->
console.info "Processed: ", result
I'm not sure though, whether this can be done better, or maybe Rx already has some mechanisms in-place to deal with this kind of requirements.
Upvotes: 1
Views: 2697
Reputation: 74654
This isn't strictly backpressure, this is just limiting concurrency. Here's an easy way to do it (ignore my possibly wrong syntax, coding via TextArea):
Rx.Observable.interval(1000)
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.map (message) ->
// also returns and `Observable`, but only when
// someone first subscribes to it
Rx.Observable.defer ->
makeHttpRequest(message)
.merge 10 // at a time
.subscribe (result) ->
console.info "Processed: ", result
In C#, the equivalent idea is, instead of SelectMany
, it's Select(Defer(x)).Merge(n)
. Merge(int)
subscribes to at most n
in-flight Observables, and buffers the rest until later. The reason we have a Defer
, is to make it such that we don't do any work until the Merge(n)
subscribes to us.
Upvotes: 2
Reputation: 29786
It sounds like you want to pull from a queue rather than push your http requests. Is Rx really the right choice of technology here?
EDIT:
In general, I would not design a solution using Rx where I had complete imperative control over the source events. It's just not a reactive scenario.
The backpressure module in Rxjs is clearly written to deal with situations where you don't own the source stream. Here you do.
TPL Dataflow sounds like a far better fit here.
If you must use RX, you could set up a loop like this: If you want to limit to X concurrent events, set up a Subject to act as your message source and imperatively push (OnNext
) X messages into it. In your subscriber, you can push a new message to the subject in each iteration of the OnNext handler until the source is exhausted. This guarantees a maximum of X messages in flight.
Upvotes: 1
Reputation: 32182
In RXJS you can use the backpressure submodule
http://rxjs.codeplex.com/SourceControl/latest#src/core/backpressure/
disclaimer
I have never used the RX version of JS but you did ask for a standard way of implementing backpressure and the core library seems to have support for it. RX c# does not yet have this support. Not sure why.
Upvotes: 1