Adam Arold
Adam Arold

Reputation: 30538

How can I explicitly signal completion of a Flowable in RxJava?

I'm trying to create a Flowable which is wrapping an Iterable. I push elements to my Iterable periodically but it seems that the completion event is implicit. I don't know how to signal that processing is complete. For example in my code:

    // note that this code is written in Kotlin
    val iterable = LinkedBlockingQueue<Int>()
    iterable.addAll(listOf(1, 2, 3))

    val flowable = Flowable.fromIterable(iterable)
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.computation())

    flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})

    iterable.add(4)

    Thread.sleep(1000)

    iterable.add(5)

    Thread.sleep(1000)

This prints:

1 2 3 4 completed

I checked the source of the Flowable interface but it seems that I can't signal that a Flowable is complete explicitly. How can I do so? In my program I publish events which have some delay between them and I would like to be explicit when to complete the event flow.

Clarification: I have a long running process which emits events. I gather them in a queue and I expose a method which returns a Flowable which wraps around my queue. The problem is that there might be already elements in the queue when I create the Flowable. I will process the events only once and I know when the flow of events stops so I know when I need to complete the Flowable.

Upvotes: 5

Views: 4229

Answers (2)

Geoffrey Marizy
Geoffrey Marizy

Reputation: 5521

Like suggested by akarnokd, ReplayProcessor do exactly what you want. Replace iterable.add(item) with processor.onNext(item), and call processor.onComplete() when you are done.

Upvotes: 2

Magnus
Magnus

Reputation: 8300

Using .fromIterable is the wrong way to create a Flowable for your use case.
Im not actually clear on what that use case is, but you probably want to use Flowable.create() or a PublishSubject

val flowable = Flowable.create<Int>( {
    it.onNext(1)
    it.onNext(2)
    it.onComplete()
}, BackpressureStrategy.MISSING)

val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()

Of course how you deal with back-pressure will depend on the nature of the source of data.

Upvotes: 4

Related Questions