Reputation: 1041
can somebody explain me how to pass onComplete signal over flatMap operator in RxJava?
If flatMap operator is commented I can get list of numbers from 1 to 10 it means toList will receive onComplete signal. But when I want to process data further in flatMap it will consume onComplete signal and I can't get any result. What can I do to pass onComplete signal over flatMap operator?
I have following simple program:
fun main(args: Array<String>) {
notify()
.flatMapMaybe { processData(it) }
.toList()
.subscribe(
{ println("onNext: $it") },
{ println("onError: ${it.message}") }
)
}
fun notify(): Flowable<Int> {
return Flowable.create({ emitter ->
val random = Random()
for (index in 1..10) {
emitter.onNext(index)
Thread.sleep((random.nextInt(500)).toLong())
}
emitter.onComplete()
}, BackpressureStrategy.BUFFER)
}
fun processData(data: Int): Maybe<String> {
return Maybe.fromCallable { data }
.flatMap {
if (it.mod(2) == 0) {
Maybe.fromCallable { it.toString() }
} else {
Maybe.never()
}
}
}
Upvotes: 1
Views: 232
Reputation: 2085
Instead of returning Maybe.never()
use Maybe.empty()
. According to documentation, Maybe.empty()
should post onComplete()
immediately.
Upvotes: 2