IsaacLevon
IsaacLevon

Reputation: 2570

RxJava 2: Why can't PublishProcessor subscribe to an Observable?

I want to implement rather simple DAG in RxJava.

We have a source of items:

Observable<String> itemsObservable = Observable.fromIterable(items)

Next, I'd like to have a processor that will subscribe to itemsObservable and will enable to multiple subscribers to subscribe to it.

So I created:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();

Unfortunately, this isn't possible:
itemsObservable.subscribe(itemsProccessor);

Why? What's the proper API to implement this kind of DAG?

Here's a diagram for demonstration:

enter image description here

Here's my (failed) try to implement this kind of DAG:

List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);

PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());

processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor); 

Upvotes: 0

Views: 917

Answers (1)

Tuby
Tuby

Reputation: 3253

It's because PublishProcessor implements Subscriber while Observable's subscribe method accepts Observer. You can convert your itemsObservable to Flowable and it will do the job.

    Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
    PublishProcessor<String> processor = PublishProcessor.create();
    items.toFlowable(BackpressureStrategy.BUFFER)
            .subscribe(processor);

Upvotes: 3

Related Questions