flakes
flakes

Reputation: 23614

Transform observable to have a minimum delay between values

I'm looking for a good solution to this question but implemented in RxJava. The question is also over five years old, so I'm wondering- is there's a better way to achieve this output?

What I'm trying to achieve is to buffer incoming events from some IObservable ( they come in bursts) and release them further, but one by one, in even intervals. Like this:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

The biggest requirement for me is that no observables are lost and that the ordering of events remain the same.

Upvotes: 1

Views: 490

Answers (1)

akarnokd
akarnokd

Reputation: 69997

This particular pattern requires remembering when the last event was scheduled so if the next event comes after a period, it can be emitted immediately and starts a new periodic emission. Perhaps an easier and efficient way is to write a custom operator:

import java.util.concurrent.TimeUnit;

import rx.*;
import rx.Observable.Operator;
import rx.schedulers.Schedulers;

public class SpanOut<T> implements Operator<T, T> {
    final long time;

    final TimeUnit unit;

    final Scheduler scheduler;

    public SpanOut(long time, TimeUnit unit, Scheduler scheduler) {
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> t) {
        Scheduler.Worker w = scheduler.createWorker();

        SpanSubscriber<T> parent = new SpanSubscriber<>(t, unit.toMillis(time), w);

        t.add(w);
        t.add(parent);

        return parent;
    }

    static final class SpanSubscriber<T> extends Subscriber<T> {
        final Subscriber<? super T> actual;

        final long spanMillis;

        final Scheduler.Worker worker;

        long lastTime;

        public SpanSubscriber(Subscriber<? super T> actual, 
                long spanMillis, Scheduler.Worker worker) {
            this.actual = actual;
            this.spanMillis = spanMillis;
            this.worker = worker;
        }

        @Override
        public void onNext(T t) {
            long now = worker.now();
            if (now >= lastTime + spanMillis) {
                lastTime = now + spanMillis;
                worker.schedule(() -> {
                    actual.onNext(t);
                });
            } else {
                long next = lastTime - now;
                lastTime += spanMillis;
                worker.schedule(() -> {
                    actual.onNext(t);
                }, next, TimeUnit.MILLISECONDS);
            }
        }

        @Override
        public void onError(Throwable e) {
            worker.schedule(() -> {
                actual.onError(e);
                unsubscribe();
            });
        }

        @Override
        public void onCompleted() {
            long next = lastTime - worker.now();
            worker.schedule(() -> {
                actual.onCompleted();
                unsubscribe();
            }, next, TimeUnit.MILLISECONDS);
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

    public static void main(String[] args) {
        Observable.range(1, 5)
        .concatWith(Observable.just(6).delay(6500, TimeUnit.MILLISECONDS))
        .concatWith(Observable.range(7, 4))
        .lift(new SpanOut<>(1, TimeUnit.SECONDS, Schedulers.computation()))
        .timeInterval()
        .toBlocking()
        .subscribe(System.out::println);
    }
}

Upvotes: 6

Related Questions