Reputation: 573
I have two observables: first is from library RxAndroidBle:
Observable<RxBleConnection> bluetoothObservable = RxBleClient.create(getBaseContext()).getBleDevice(macAddress)
.establishConnection(false)
Which connects to device and keeps connection while it has subscribers, and another
Observable<Response> serverObservable = Observable.fromCallable(() -> callServer())
Then I zip them together
bluetoothObservable.zipWith(serverObservable , (rxBleConnection, s) -> {
Log.d(TAG, "zip done");
return "mock result";
}).subscribe((s) -> {},
Throwable::printStackTrace);
But after zip bluetoothObservable
gets unsubscribed and connection immediately falls. What should I do to zip those observables and keep bluetoothObservable
alive/subscribed?
Upvotes: 1
Views: 613
Reputation: 3222
Instead of .zip()
you can use:
Observable.combineLatest(
bluetoothObservable,
serverObservable,
(rxBleConnection, s) -> {
Log.d(TAG, "combined");
return "mock result";
}
)
.subscribe(
(s) -> {},
Throwable::printStackTrace
)
Explanation: zip
tries to combine outputs from both Observable
s one by one. If one of them will complete and all previous emissions from that Observable
were matched — there is no point of keeping the subscription to the other Observable
as the subsequent emissions will not be used. combineLatest
just tries to combine all emissions from both Observable
s into pairs that are the latest.
Upvotes: 2