mynameisteodora
mynameisteodora

Reputation: 41

RxAndroidBle 2 subscribe to two different characteristics in Kotlin

I apologise if this is a duplicate question, I could not find a helpful answer anywhere. I am also a beginner in reactive programming so I am sorry in advance for silly questions.

I am using rxandroidble2 in a Kotlin Android app. I can successfully connect to a BLE device if I only want to read from one characteristic. However, I cannot get it to work for connecting to two separate characteristics.

Characteristic A sends packets every two seconds. Characteristic B sends packets multiple times per second. I can connect to both of them if I do it separately, i.e. run the app with the connection made for only CharA or CharB.

The solutions I have tried are all from stackoverflow, in particular I have tried:

My code is this:

respeckLiveSubscription = connectionObservable?.flatMap{
            Observable.combineLatest(
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_A_UUID)),
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_B_UUID)),
                io.reactivex.functions.BiFunction<Observable<ByteArray>, Observable<ByteArray>, Pair<Observable<ByteArray>, Observable<ByteArray>>> {
                    t1, t2 -> Pair.create(t1, t2)
                }
            )
        }?.subscribe({
            processRESpeckPacket(it.first.blockingFirst(), respeckVersion, this)
            Log.i("ble", "processed first packet")
            processRESpeckPacket(it.second.blockingFirst(), 7, this)
            Log.i("ble", "processed second packet")
        },{
            Log.i("ble", "error when connecting = " + it.stackTrace)
        })

        Log.i("ble", "does this mean connection is over")

I have a few issues:

Please help me out I have been stuck with this forever! How can I get it to work in Kotlin?

Thanks a lot.

Upvotes: 1

Views: 644

Answers (1)

Dariusz Seweryn
Dariusz Seweryn

Reputation: 3222

Is the fact that my characteristics are sent at different frequencies causing all the problems?

Yes. Observable<ByteArray> emitted from setupNotification() is a hot Observable — it does emit whether you listen or not — same as a radio. Using .blocking*() operators blocks your listening thread and if the other Observable would emit during blocked time you would miss it.

I also tried most of the solutions above in Java and I am getting the same behaviour. Did something change in rxandroidble2 that is not explained in the stackoverflow answers that I linked to?

Changes between RxJava 1 and 2 were mostly cosmetic with changes from Observable to Single or Completable where appropriate. Some operators were deleted or changed their name but it is not your case.

If you only need latest values emitted by both of your characteristics you can use code like this:

respeckLiveSubscription = connectionObservable?.flatMap{
            Observable.combineLatest(
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_A_UUID)).flatMap { it }, // notice flatMap
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_B_UUID)).flatMap { it }, // notice flatMap
                io.reactivex.functions.BiFunction<ByteArray, ByteArray, Pair<ByteArray, ByteArray>> {
                    t1, t2 -> Pair.create(t1, t2) // now we have 'ByteArray's here, not 'Observable<ByteArray>'s 
                }
            )
        }?.subscribe({
            processRESpeckPacket(it.first, respeckVersion, this)
            Log.i("ble", "processed first value")
            processRESpeckPacket(it.second, 7, this)
            Log.i("ble", "processed second value")
        },{
            Log.i("ble", "error when connecting = " + it.stackTrace)
        })

        Log.i("ble", "does this mean connection is over") // no it is only subscribed

Upvotes: 1

Related Questions