Reputation: 311
I'd like to transform a Flowable
so that it defers emitting items until a specified number of items is collected, and then emits them to downstream in FIFO order maintaining constant delayed item count. Once upstream signals onComplete, the queued items should be flushed to downstream before emitting onComplete:
(in this example delayed item number is 3)
1 2 3 4 5 6 7 |
1 2 3 4 5 6 7 |
I don't see any existing operators that do this or can be modified to get that behavior. Observable.delay
seems to support only time-based delay, not count-based delay.
It should be easy to implement a custom operator to achieve this, but maybe there's a simpler way with existing operators?
Upvotes: 1
Views: 228
Reputation: 70007
You can publish a sequence, skip the last N, then append the last N back:
Flowable.range(1, 7)
.flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(v))
// -------------------------------------------------------------------
.publish(f ->
f.skipLast(3).mergeWith(f.takeLast(3))
)
// -------------------------------------------------------------------
.blockingSubscribe(v -> System.out.println("<-- " + v));
Upvotes: 2