Nikita Khlebushkin
Nikita Khlebushkin

Reputation: 573

RxJava Zip two observables without completing them

I have two observables: first is from library RxAndroidBle:

Observable<RxBleConnection> bluetoothObservable = RxBleClient.create(getBaseContext()).getBleDevice(macAddress)
.establishConnection(false)

Which connects to device and keeps connection while it has subscribers, and another

Observable<Response> serverObservable = Observable.fromCallable(() -> callServer())

Then I zip them together

bluetoothObservable.zipWith(serverObservable , (rxBleConnection, s) -> {
                                Log.d(TAG, "zip done");
                                return "mock result";
                            }).subscribe((s) -> {},
                                    Throwable::printStackTrace);

But after zip bluetoothObservable gets unsubscribed and connection immediately falls. What should I do to zip those observables and keep bluetoothObservable alive/subscribed?

Upvotes: 1

Views: 613

Answers (1)

Dariusz Seweryn
Dariusz Seweryn

Reputation: 3222

Instead of .zip() you can use:

Observable.combineLatest(
  bluetoothObservable, 
  serverObservable,
  (rxBleConnection, s) -> {
    Log.d(TAG, "combined");
    return "mock result";
  }
)
  .subscribe(
    (s) -> {},
    Throwable::printStackTrace
  )

Explanation: zip tries to combine outputs from both Observables one by one. If one of them will complete and all previous emissions from that Observable were matched — there is no point of keeping the subscription to the other Observable as the subsequent emissions will not be used. combineLatest just tries to combine all emissions from both Observables into pairs that are the latest.

Upvotes: 2

Related Questions