Serg Burlaka
Serg Burlaka

Reputation: 2496

Single with flowable?

Try in rxJava2 Kotlin combine Single with Flowable but nothing not happening: Does not undrstand what wrong

  Flowable.create<Int>({ emmit ->

            loadNewListener = object :Listener {
                override fun onEmit(id: Int) {
                    emmit.onNext(id)
                }
            }
        }, BackpressureStrategy.LATEST)
                .debounce(500, TimeUnit.MILLISECONDS)
                .flatMapSingle {
                    loadNew(id = it.id)
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ (data:Data) ->

                }, {

                    Timber.e("Failed load data ${it.message}")
                })

my method is returning Single:

private fun loadNew(id: Int): Single<Data> {

            return when (pdfType) {

                CASE_0 -> {

                    Single.create<Data> { emmit ->

             service.get("data")
                    .enqueue(
                    object : Callback<Void> {
                         override fun onFailure(call: Call<Void>?, t: Throwable?) {
                            // failure
                        }

                          override fun onResponse(call: Call<Void>?, response:  Response<Void>?) {
                 emmit.onSuccess(it.data)
            }
                        }
                    }//single
                }//case_0


                CASE_1 -> 1Repository.loadsome1Rx(id = id).map { it.getData() }

                CASE_2 -> 2Repository.loadsom2LocalRx(id = id).map { it.getData() }

                else -> {
                    throw java.lang.RuntimeException("$this is not available type!")
                }
            }

What is wrong im my code? Need Maby call Single in Flowable subscribe() seppurate like this?

Flowable.create<Int>({ emmit ->
        loadNewListener = object :Listener {
            override fun onEmit(id: Int) {
                emmit.onNext(id)
            }
        }
    }, BackpressureStrategy.LATEST)
            .debounce(500, TimeUnit.MILLISECONDS)

          .subscribe({
              loadNew(id = it.id)

          }, {
              Timber.e("")
          })

This code is workin but looks not simple as via combine try.

Upvotes: 0

Views: 982

Answers (2)

Serg Burlaka
Serg Burlaka

Reputation: 2496

As @Stas Bondar said in answer below This simple example based on your code is working!!

Problem was in loadNewListener .

It does not init in time and has null value when need. Call create Flowable on init ViewModel but loadNewListener did not have time to create when i call him from fragment.

loadNewListener = object :Listener{...}

Becuse need some time mutch for init rxJava expression!

And combine flowable with single via flatMapSingle spent more time than just call single on flowable dubscrinbe!

So use temp field:

     private var temp: Temp? = null

        fun load(id: Int) {

 loadNewListener.apply {

                when {
                    this != null -> load(id = id)

                    else -> userEmitPdfTemp = Temp(id = id)
                }
            }
        }




 Flowable.create<Data>({ emmit ->

                    userEmitPdfTemp?.let {id->

                        emmit.onNext(Data(id))
                        userEmitPdfTemp =null

                    }

                    loadNewListener = object :Listener {
                        override fun load(id: Int) {

                            emmit.onNext(Data(id))

                        }
                    }

                }

Upvotes: 0

Stanislav Bondar
Stanislav Bondar

Reputation: 6245

This simple example based on your code is working

var i = 0
fun foo() {
    Flowable.create<Int>({ emmit ->
        emmit.onNext(i)
        i++
    }, BackpressureStrategy.LATEST)
            .debounce(500, TimeUnit.MILLISECONDS)
            .flatMapSingle {
                Single.create<String> { emmit ->
                    emmit.onSuccess("onSuccess: $it")
                }
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                Log.i("RX", "Subscribe: $it")
            }, {
                it.printStackTrace()
            })
}

Check SingleEmitter.onSuccess() and SingleEmitter.onError() is called in all cases in when (pdfType)...

Upvotes: 1

Related Questions