Reputation: 280
I have the following problem:
I have a observable that is doing some work, but other observables need the output of that observable to work. I have tried to subscribe several times on the same observable, but inside the log i see that the original observable is initiated multiple times.
thats my observable thats create the object:
Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
if (mApi == null) {
//do some work
}
subscriber.onNext(mApi);
subscriber.unsubscribe();
})
thats my observable that needs the object
loadApi().flatMap(api -> api....()));
I'm using
.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(Schedulers.io()
on all observables.
Upvotes: 7
Views: 5428
Reputation: 30335
I'm not sure that I understood your question correctly, but I figure you're looking for a way to share the emissions of an observable between several subscribers. There are several ways of doing this. For one, you could use a Connectable Observable like so:
ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items
Output:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
Alternatively, you could use a PublishSubject:
PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable
Output:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
Both of these examples are single threaded, but you can easily add observeOn or subscirbeOn calls to make them async.
Upvotes: 14
Reputation: 3083
First of all using Observable.create is tricky and easy to get wrong. You need something like
Observable.create(subscriber -> {
if (mApi == null) {
//do some work
}
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(mApi);
subscriber.onCompleted();
// Not subscriber.unsubscribe();
}
})
You can use
ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect();
All subsequent subscribers should get the single emitted item
obs.subscribe(item -> System.out.println("Sub 1 " + item));
obs.subscribe(item -> System.out.println("Sub 2 " + item));
obs.subscribe(item -> System.out.println("Sub 3 " + item));
obs.subscribe(item -> System.out.println("Sub 4 " + item));
Upvotes: 1