lannyf
lannyf

Reputation: 11035

kotlin, got “Type mismatch. Required: Disposable? Found: Unit” when using observer object instance in the subscribe()

Edit:

based on Dmitry Ikryanov's suggestion, using DisposableObserver will compile, but it causes crash

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to 
subscribe with a(n) com.DataManager$theObserver$1 multiple times. Please 
create a fresh instance of com.DataManager$theObserver$1 and subscribe that 
to the target source instead.

the only code of subecribWith(), which has been called only once

fun initSession() {
    if (mDisposable != null && mDisposable!!.isDisposed) {
        mDisposable!!.dispose()
    }

    mDisposable = RxBus.listen(DataEvent::class.java).subscribeWith(theObserver)  <=== crash at here
}

the DisposableObserver is a member variable of the class:

var theObserver: DisposableObserver<DataEvent> = object : DisposableObserver<DataEvent>() {
    override fun onComplete() {
        Log.e(TAG, "onComplete: All Done!")        }

    override fun onNext(t: DataEvent) {
        Log.e(TAG, "Next: " + t)
        onDataReady(t)        }

    override fun onError(e: Throwable) {
        Log.e(TAG, "onError: ")
    }
}

===

Original question:

trying to use RxJava subscribe() in kotlin, get an error “Type mismatch. Required: Disposable? Found: Unit”, not sure what it means, anyone knows?

class DataEvent {}

using RxBus

object RxBus {

private val publisher = PublishSubject.create<Any>()

fun publish(event: Any) {
    publisher.onNext(event)
}

// Listen should return an Observable and not the publisher
// Using ofType we filter only events that match that class type
fun <T> listen(eventType: Class<T>): Observable<T> = publisher.ofType(eventType)

}

when call like this, it is ok:

mDisposable = RxBus.listen(DataEvent::class.java).subscribe({
        onDataReady(it)
    })

but when call the RxBus.listen(DataEvent::class.java).subscribe(observer) with defined observer instance it shows red underline: “Type mismatch. Required: Disposable? Found: Unit”

mDisposable = RxBus.listen(DataEvent::class.java).subscribe(observer)

the observer is:

var observer: Observer<DataEvent> = object : Observer<DataEvent> {
    override fun onSubscribe(d: Disposable) {
        Log.e(TAG, "onSubscribe: ")
    }

    override fun onNext(@NonNull t: DataEvent) {
        Log.e(TAG, "onNext: " + t)
        onDataReady(t)
    }

    override fun onError(e: Throwable) {
        Log.e(TAG, "onError: ")
    }

    override fun onComplete() {
        Log.e(TAG, "onComplete: All Done!")
     }
}

Upvotes: 1

Views: 6044

Answers (1)

Dmitry Ikryanov
Dmitry Ikryanov

Reputation: 322

It's because in RxJava 2.0 method subscribe(observer) was changed and return nothing.

Unlike the Observable of version 1.x, subscribe(Observer) does not allow external cancellation of a subscription and the Observer instance is expected to expose such capability.

You can use subscribeWith(observer).
Example:

val disposable = Observable.just("Hello world!")
                .delay(1, TimeUnit.SECONDS)
                .subscribeWith(object : DisposableObserver<String>() {
                    public override fun onStart() {
                        println("Start!")
                    }

                    fun onNext(t: Int?) {
                        println(t)
                    }

                    override fun onError(t: Throwable) {
                        t.printStackTrace()
                    }

                    override fun onComplete() {
                        println("Done!")
                    }
                })

Upvotes: 4

Related Questions