Reputation: 2570
I want to implement rather simple DAG in RxJava.
We have a source of items:
Observable<String> itemsObservable = Observable.fromIterable(items)
Next, I'd like to have a processor that will subscribe to itemsObservable
and will enable to multiple subscribers to subscribe to it.
So I created:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();
Unfortunately, this isn't possible:
itemsObservable.subscribe(itemsProccessor);
Why? What's the proper API to implement this kind of DAG?
Here's a diagram for demonstration:
Here's my (failed) try to implement this kind of DAG:
List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);
PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());
processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor);
Upvotes: 0
Views: 917
Reputation: 3253
It's because PublishProcessor
implements Subscriber
while Observable
's subscribe method accepts Observer
. You can convert your itemsObservable
to Flowable
and it will do the job.
Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
PublishProcessor<String> processor = PublishProcessor.create();
items.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(processor);
Upvotes: 3