tenshi
tenshi

Reputation: 26566

Message processing throttling/backpressure

I have the source of the messages, which is an Observable. For every message I would like to make an HTTP call which will produce another Observable, so I combine them together with the flatMap and then sink them to some subscriber. Here the code of this scenario:

Rx.Observable.interval(1000)
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.subscribe (result) ->
  console.info "Processed: ", result

this example is written in coffeescript, but I think the problem statement would be valid for any other Rx implementation.

The issue I have with this approach is that loadMessages produces a lot of messages very quickly. This means, that I make a lot of HTTP requests in a very short period of time. This is not acceptable in my situation, so I would like to limit amount of the parallel HTTP requests to 10 or so. In other words I would like to throttle the pipelene or apply some kind of backpresure, when I making HTTP requests.

Is there any standard approach or best practices for the Rx to deal with this kind of situations?

Currently I implemented very simple (and pretty suboptimal) backpresure mechanism, that ignores tick if system has too many massages in processing. It looks like this (simplified version):

Rx.Observable.interval(1000)
.filter (tick) ->
  stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
  stats.messageIn()
.flatMap (tick) ->
  // returns an `Observable`
  loadMessages()
.flatMap (message) ->
  // also returns and `Observable`
  makeHttpRequest(message)
.do (tick) ->
  stats.messageOut()
.subscribe (result) ->
  console.info "Processed: ", result

I'm not sure though, whether this can be done better, or maybe Rx already has some mechanisms in-place to deal with this kind of requirements.

Upvotes: 1

Views: 2697

Answers (3)

Ana Betts
Ana Betts

Reputation: 74654

This isn't strictly backpressure, this is just limiting concurrency. Here's an easy way to do it (ignore my possibly wrong syntax, coding via TextArea):

Rx.Observable.interval(1000)
    .flatMap (tick) ->
        // returns an `Observable`
        loadMessages()
    .map (message) ->
        // also returns and `Observable`, but only when
        // someone first subscribes to it
        Rx.Observable.defer ->
            makeHttpRequest(message)
    .merge 10 // at a time
    .subscribe (result) ->
        console.info "Processed: ", result

In C#, the equivalent idea is, instead of SelectMany, it's Select(Defer(x)).Merge(n). Merge(int) subscribes to at most n in-flight Observables, and buffers the rest until later. The reason we have a Defer, is to make it such that we don't do any work until the Merge(n) subscribes to us.

Upvotes: 2

James World
James World

Reputation: 29786

It sounds like you want to pull from a queue rather than push your http requests. Is Rx really the right choice of technology here?

EDIT:

In general, I would not design a solution using Rx where I had complete imperative control over the source events. It's just not a reactive scenario.

The backpressure module in Rxjs is clearly written to deal with situations where you don't own the source stream. Here you do.

TPL Dataflow sounds like a far better fit here.

If you must use RX, you could set up a loop like this: If you want to limit to X concurrent events, set up a Subject to act as your message source and imperatively push (OnNext) X messages into it. In your subscriber, you can push a new message to the subject in each iteration of the OnNext handler until the source is exhausted. This guarantees a maximum of X messages in flight.

Upvotes: 1

bradgonesurfing
bradgonesurfing

Reputation: 32182

In RXJS you can use the backpressure submodule

http://rxjs.codeplex.com/SourceControl/latest#src/core/backpressure/

disclaimer I have never used the RX version of JS but you did ask for a standard way of implementing backpressure and the core library seems to have support for it. RX c# does not yet have this support. Not sure why.

Upvotes: 1

Related Questions