Reputation: 24518
While playing with various backpressure scenarios, I implemented a case where one subscriber is slow with a buffer, while another one consumes whatever is thrown to it. That was using Scala and Akka Streams. You can see the code here if you want to and the test that runs it here.
I usually try to develop a RxJava version for comparison but I got stuck on this one. In Akka Streams, I can build a graph with one source that broadcasts on 2 channels, and have a slow sink and a fast sink consume from those channels. Each channel can independently apply buffering and throttling. In RxJava, there is the share
operator for broadcasting but the buffering and throttling logic is not on the Subscriber
, but on the Observable
. Thus I'm not sure how to apply buffering and throttling and not have both subscribers affected. Both Akka Streams and RxJava being implementation of Rx, I'm hoping there's a way to get what I want.
Here's a pictorial version of what I'm trying to do.
Upvotes: 2
Views: 1530
Reputation: 12196
You should be able decorate the shared()
observable in different ways for different subscribers if you want different backpressure behaviors.
For instance
Observable<Integer> source = Observable.interval(0, 1, TimeUnit.SECONDS).share();
// Naked source for fast consumers.
Observable<Integer> fast = source;
// Buffer for slow consumers that use backpressure.
Observable<Integer> slow = source.onBackpressureBuffer();
Subscribers to fast
and slow
above will ultimately use the same shared source.
Note that fast
does not respond to backpressure because interval
does not respond to backpressure.
There are different flavors of onBackpressureXXX() that you can get different behaviors with.
Upvotes: 0
Reputation: 11539
Something like this?
import rx.Observable;
import rx.observables.ConnectableObservable;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
//emits Long every 100 milliseconds, and publishes to all Subscribers simultaneously through ConnectableObservable
ConnectableObservable<Long> source = Observable.interval(100, TimeUnit.MILLISECONDS).publish();
//buffers emissions into Lists within 1 second durations, and first Subscriber prints them
source.buffer(1,TimeUnit.SECONDS).subscribe(System.out::println);
//no buffering, just push emissions directly to this second Subscriber which prints them
source.subscribe(System.out::println);
//start firing emissions now both Subscribers are connected
source.connect();
//sleep to keep program alive for 10 seconds
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
The Subscriber
has no notion of throttling or any operators. Those are done on the Observable
side through various operators which yield different Observable
implementations. The Subscriber
is pretty dumb and simply consumes the emission as the final step in an Observable
chain. It is agnostic to which thread the emission comes on, much less whether its been throttled or not by an Observable
passing items to it.
Upvotes: 1