apricot
apricot

Reputation: 3767

How to delay items, but only once in the beginning?

The delay operator delays all items by the specified amount of time. I would like to delay and buffer items only for the first N seconds. After N seconds there should be no delay. I need to do this in the following code.

private Emitter<Work> workEmitter;

// In the constructor.
Flowable.create(
        (FlowableOnSubscribe<Work>) emitter -> workEmitter = emitter.serialize(),
        BackpressureStrategy.BUFFER)
    .observeOn(Schedulers.from(executor))
    .subscribe(work -> process(work));

// On another thread, as work comes in, ...
workEmitter.onNext(t);

What I want to do is postpone processing of work during the first N seconds, but not after that. I tried delaySubscription, but it leaves workEmitter as null during the delayed period. The reason I want to do this is to make the CPUs available for other important work during the initial period.

Upvotes: 0

Views: 524

Answers (2)

akarnokd
akarnokd

Reputation: 70007

You could use a UnicastProcessor and subscribe to it after some delay:

FlowableProcessor<Work> processor = UnicastProcessor.<Work>create().toSerialized();

processor.delaySubscription(N, TimeUnit.SECONDS)
.observeOn(Schedulers.from(executor))
.subscribe( work -> process(work));

// On another thread, as work comes in, ...
processor.onNext(t);

The UnicastProcessor will keep buffering work items until the delaySubscription's time has elapsed and then switches to it.

Upvotes: 1

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

You can delay creation of the observable and then subscribe to it.

Observable.timer( N, SECONDS )
  .flatMap( ignored -> Flowable.create(
    (FlowableOnSubscribe<Work>) emitter -> workEmitter = emitter.serialize(),
       BackpressureStrategy.BUFFER)
    .observeOn(Schedulers.from(executor)))
  .subscribe( work -> process(work));

This will not start the observer chain until the N seconds has passed.

Upvotes: 1

Related Questions