Reputation: 2496
Try in rxJava2 Kotlin combine Single with Flowable but nothing not happening: Does not undrstand what wrong
Flowable.create<Int>({ emmit ->
loadNewListener = object :Listener {
override fun onEmit(id: Int) {
emmit.onNext(id)
}
}
}, BackpressureStrategy.LATEST)
.debounce(500, TimeUnit.MILLISECONDS)
.flatMapSingle {
loadNew(id = it.id)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ (data:Data) ->
}, {
Timber.e("Failed load data ${it.message}")
})
my method is returning Single:
private fun loadNew(id: Int): Single<Data> {
return when (pdfType) {
CASE_0 -> {
Single.create<Data> { emmit ->
service.get("data")
.enqueue(
object : Callback<Void> {
override fun onFailure(call: Call<Void>?, t: Throwable?) {
// failure
}
override fun onResponse(call: Call<Void>?, response: Response<Void>?) {
emmit.onSuccess(it.data)
}
}
}//single
}//case_0
CASE_1 -> 1Repository.loadsome1Rx(id = id).map { it.getData() }
CASE_2 -> 2Repository.loadsom2LocalRx(id = id).map { it.getData() }
else -> {
throw java.lang.RuntimeException("$this is not available type!")
}
}
What is wrong im my code? Need Maby call Single in Flowable subscribe() seppurate like this?
Flowable.create<Int>({ emmit ->
loadNewListener = object :Listener {
override fun onEmit(id: Int) {
emmit.onNext(id)
}
}
}, BackpressureStrategy.LATEST)
.debounce(500, TimeUnit.MILLISECONDS)
.subscribe({
loadNew(id = it.id)
}, {
Timber.e("")
})
This code is workin but looks not simple as via combine try.
Upvotes: 0
Views: 982
Reputation: 2496
As @Stas Bondar said in answer below This simple example based on your code is working!!
Problem was in loadNewListener .
It does not init in time and has null value when need. Call create Flowable on init ViewModel but loadNewListener did not have time to create when i call him from fragment.
loadNewListener = object :Listener{...}
Becuse need some time mutch for init rxJava expression!
And combine flowable with single via flatMapSingle spent more time than just call single on flowable dubscrinbe!
So use temp field:
private var temp: Temp? = null
fun load(id: Int) {
loadNewListener.apply {
when {
this != null -> load(id = id)
else -> userEmitPdfTemp = Temp(id = id)
}
}
}
Flowable.create<Data>({ emmit ->
userEmitPdfTemp?.let {id->
emmit.onNext(Data(id))
userEmitPdfTemp =null
}
loadNewListener = object :Listener {
override fun load(id: Int) {
emmit.onNext(Data(id))
}
}
}
Upvotes: 0
Reputation: 6245
This simple example based on your code is working
var i = 0
fun foo() {
Flowable.create<Int>({ emmit ->
emmit.onNext(i)
i++
}, BackpressureStrategy.LATEST)
.debounce(500, TimeUnit.MILLISECONDS)
.flatMapSingle {
Single.create<String> { emmit ->
emmit.onSuccess("onSuccess: $it")
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.i("RX", "Subscribe: $it")
}, {
it.printStackTrace()
})
}
Check SingleEmitter.onSuccess()
and SingleEmitter.onError()
is called in all cases in when (pdfType)...
Upvotes: 1