Reputation: 3767
The delay
operator delays all items by the specified amount of time. I would like to delay and buffer items only for the first N seconds. After N seconds there should be no delay. I need to do this in the following code.
private Emitter<Work> workEmitter;
// In the constructor.
Flowable.create(
(FlowableOnSubscribe<Work>) emitter -> workEmitter = emitter.serialize(),
BackpressureStrategy.BUFFER)
.observeOn(Schedulers.from(executor))
.subscribe(work -> process(work));
// On another thread, as work comes in, ...
workEmitter.onNext(t);
What I want to do is postpone processing of work during the first N seconds, but not after that. I tried delaySubscription, but it leaves workEmitter
as null
during the delayed period. The reason I want to do this is to make the CPUs available for other important work during the initial period.
Upvotes: 0
Views: 524
Reputation: 70007
You could use a UnicastProcessor
and subscribe to it after some delay:
FlowableProcessor<Work> processor = UnicastProcessor.<Work>create().toSerialized();
processor.delaySubscription(N, TimeUnit.SECONDS)
.observeOn(Schedulers.from(executor))
.subscribe( work -> process(work));
// On another thread, as work comes in, ...
processor.onNext(t);
The UnicastProcessor
will keep buffering work items until the delaySubscription
's time has elapsed and then switches to it.
Upvotes: 1
Reputation: 8227
You can delay creation of the observable and then subscribe to it.
Observable.timer( N, SECONDS )
.flatMap( ignored -> Flowable.create(
(FlowableOnSubscribe<Work>) emitter -> workEmitter = emitter.serialize(),
BackpressureStrategy.BUFFER)
.observeOn(Schedulers.from(executor)))
.subscribe( work -> process(work));
This will not start the observer chain until the N seconds has passed.
Upvotes: 1