Reputation: 30538
I'm trying to create a Flowable
which is wrapping an Iterable
. I push elements to my Iterable
periodically but it seems that the completion event is implicit. I don't know how to signal that processing is complete. For example in my code:
// note that this code is written in Kotlin
val iterable = LinkedBlockingQueue<Int>()
iterable.addAll(listOf(1, 2, 3))
val flowable = Flowable.fromIterable(iterable)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
flowable.subscribe(::println, {it.printStackTrace()}, {println("completed")})
iterable.add(4)
Thread.sleep(1000)
iterable.add(5)
Thread.sleep(1000)
This prints:
1 2 3 4 completed
I checked the source of the Flowable
interface but it seems that I can't signal that a Flowable
is complete explicitly. How can I do so? In my program I publish events which have some delay between them and I would like to be explicit when to complete
the event flow.
Clarification: I have a long running process which emits events. I gather them in a queue and I expose a method which returns a Flowable which wraps around my queue. The problem is that there might be already elements in the queue when I create the Flowable. I will process the events only once and I know when the flow of events stops so I know when I need to complete the Flowable.
Upvotes: 5
Views: 4229
Reputation: 5521
Like suggested by akarnokd, ReplayProcessor do exactly what you want. Replace iterable.add(item)
with processor.onNext(item)
, and call processor.onComplete()
when you are done.
Upvotes: 2
Reputation: 8300
Using .fromIterable
is the wrong way to create a Flowable
for your use case.
Im not actually clear on what that use case is, but you probably want to use Flowable.create()
or a PublishSubject
val flowable = Flowable.create<Int>( {
it.onNext(1)
it.onNext(2)
it.onComplete()
}, BackpressureStrategy.MISSING)
val publishSubject = PublishSubject.create<Int>()
val flowableFromSubject = publishSubject.toFlowable(BackpressureStrategy.MISSING)
//This data will be dropepd unless something is subscribed to the flowable.
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.onComplete()
Of course how you deal with back-pressure will depend on the nature of the source of data.
Upvotes: 4