Andrey
Andrey

Reputation: 9072

How to recursively generate an RxJava Flowable using operators vended in the core lib?

The simple program below eventually hangs.

// Kotlin
package com.example

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    fun incr(n: Int): Single<Int> = Single.just(n + 1)

    fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
        if (n < max)
            incr(n).observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max) }
        else
            Flowable.empty()
    )

    numbers(1, 1_000_000).sample(5, SECONDS).blockingForEach(::println)
}

On my laptop it usually hangs somewhere after 23500, example output:

15945
21159
23802

The question is actually two-fold:

Upvotes: 1

Views: 410

Answers (1)

Andrey
Andrey

Reputation: 9072

A potential solution, based on suggestions in the comments:

package com.example

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.processors.UnicastProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    // doesn't seem to work with PublishProcessor or MulticastProcessor
    fun <T> unfold(seed: T, next: (T) -> T?): Flowable<T> =
        UnicastProcessor.create<T>().toSerialized().let { proc ->
            proc
                .startWithItem(seed)
                .doOnNext { prev ->
                    when (val curr = next(prev)) {
                        null ->
                            proc.onComplete()
                        else ->
                            proc.onNext(curr)
                    }
                }
        }

    fun numbers(first: Int, max: Int): Flowable<Int> =
        unfold(first) { prev -> if (prev < max) prev + 1 else null }

    numbers(1, 1_000_000_000)
        .sample(1, SECONDS)
        .blockingForEach(::println)
}

Upvotes: 1

Related Questions