Reputation: 9072
The simple program below eventually hangs.
// Kotlin
package com.example
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS
fun main() {
fun incr(n: Int): Single<Int> = Single.just(n + 1)
fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
if (n < max)
incr(n).observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max) }
else
Flowable.empty()
)
numbers(1, 1_000_000).sample(5, SECONDS).blockingForEach(::println)
}
On my laptop it usually hangs somewhere after 23500, example output:
15945
21159
23802
The question is actually two-fold:
Upvotes: 1
Views: 410
Reputation: 9072
A potential solution, based on suggestions in the comments:
package com.example
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.processors.UnicastProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS
fun main() {
// doesn't seem to work with PublishProcessor or MulticastProcessor
fun <T> unfold(seed: T, next: (T) -> T?): Flowable<T> =
UnicastProcessor.create<T>().toSerialized().let { proc ->
proc
.startWithItem(seed)
.doOnNext { prev ->
when (val curr = next(prev)) {
null ->
proc.onComplete()
else ->
proc.onNext(curr)
}
}
}
fun numbers(first: Int, max: Int): Flowable<Int> =
unfold(first) { prev -> if (prev < max) prev + 1 else null }
numbers(1, 1_000_000_000)
.sample(1, SECONDS)
.blockingForEach(::println)
}
Upvotes: 1