Abhijit Sarkar
Abhijit Sarkar

Reputation: 24518

RxJava: How to implement a slow consumer with buffer and a fast consumer to the same source?

While playing with various backpressure scenarios, I implemented a case where one subscriber is slow with a buffer, while another one consumes whatever is thrown to it. That was using Scala and Akka Streams. You can see the code here if you want to and the test that runs it here.

I usually try to develop a RxJava version for comparison but I got stuck on this one. In Akka Streams, I can build a graph with one source that broadcasts on 2 channels, and have a slow sink and a fast sink consume from those channels. Each channel can independently apply buffering and throttling. In RxJava, there is the share operator for broadcasting but the buffering and throttling logic is not on the Subscriber, but on the Observable. Thus I'm not sure how to apply buffering and throttling and not have both subscribers affected. Both Akka Streams and RxJava being implementation of Rx, I'm hoping there's a way to get what I want.

Here's a pictorial version of what I'm trying to do.

Upvotes: 2

Views: 1530

Answers (2)

Dev
Dev

Reputation: 12196

You should be able decorate the shared() observable in different ways for different subscribers if you want different backpressure behaviors.

For instance

Observable<Integer> source = Observable.interval(0, 1, TimeUnit.SECONDS).share();

// Naked source for fast consumers.
Observable<Integer> fast = source; 

// Buffer for slow consumers that use backpressure.
Observable<Integer> slow = source.onBackpressureBuffer();

Subscribers to fast and slow above will ultimately use the same shared source.

Note that fast does not respond to backpressure because interval does not respond to backpressure.

There are different flavors of onBackpressureXXX() that you can get different behaviors with.

Upvotes: 0

tmn
tmn

Reputation: 11539

Something like this?

import rx.Observable;
import rx.observables.ConnectableObservable;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) {

        //emits Long every 100 milliseconds, and publishes to all Subscribers simultaneously through ConnectableObservable
        ConnectableObservable<Long> source = Observable.interval(100, TimeUnit.MILLISECONDS).publish();

        //buffers emissions into Lists within 1 second durations, and first Subscriber prints them
        source.buffer(1,TimeUnit.SECONDS).subscribe(System.out::println);

        //no buffering, just push emissions directly to this second Subscriber which prints them
        source.subscribe(System.out::println);

        //start firing emissions now both Subscribers are connected
        source.connect();

        //sleep to keep program alive for 10 seconds
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

The Subscriber has no notion of throttling or any operators. Those are done on the Observable side through various operators which yield different Observable implementations. The Subscriber is pretty dumb and simply consumes the emission as the final step in an Observable chain. It is agnostic to which thread the emission comes on, much less whether its been throttled or not by an Observable passing items to it.

Upvotes: 1

Related Questions