HoverPhoenix
HoverPhoenix

Reputation: 177

RxJava Delay for Hot Observables

I saw this question here. It's about achieving delay for each emitted item. This is how to achieve it based on the accepted answer:

Observable.zip(Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
  System.out.println(System.currentTimeMillis() - timeNow);
  System.out.println(item);
  System.out.println(" ");
}).toList().toBlocking().first();

In the question, the asker specifically asked for a fixed set of observables (Observable.range(1,5)), unfortunately this is not what I want to achieve.

I also saw this comment.

That comment is what I want to achieve. So my source observable emits items at a slower (and sometimes faster) rate than the interval. Also the observable's emits are never ending.

===

So basically I want hot observables to have minimum delay.

For example, if I want 400ms minimum delay and I have this kind of observable emittance:

X1-100ms delay-X2-200ms delay-X3-600ms delay-X4-20000ms delay-X5-...

I want it to yield:

X1-400ms delay-X2-400ms delay-X3-600ms delay-X4-20000ms delay-X5-...

Anybody have any idea to achieve that?

Upvotes: 1

Views: 248

Answers (1)

Dean Xu
Dean Xu

Reputation: 4681

Your requirement is so strange...

I can resolve it but not elegant. Here is my code:

class Three<A, B, C> {
    A a;
    B b;
    C c;
    // Getter, Setter, Constructor
  }

  public static void main(String[] args) throws Exception {
    BehaviorSubject<Integer> s = BehaviorSubject.create();
    // Three = (The value, upstream comes mills, downstream emits mills)
    s.map(i -> new Three<>(i, System.currentTimeMillis(), System.currentTimeMillis()))
        .scan((a, b) -> {
          b.setC(a.getC() + Math.max(400L, b.getB() - a.getB()));
          return b;
        })
        .concatMap(i -> Observable.just(i.getA()).delay(Math.max(0, i.getC() - System.currentTimeMillis()),
            TimeUnit.MILLISECONDS))
        .subscribe(i -> System.out.println(i + "\t" + System.currentTimeMillis()));
    s.onNext(0);
    Thread.sleep(100);
    s.onNext(1);
    Thread.sleep(200);
    s.onNext(2);
    Thread.sleep(600);
    s.onNext(3);
    Thread.sleep(2000);
    s.onNext(4);
    Thread.sleep(200);
    s.onNext(5);
    Thread.sleep(800);
    s.onNext(6);
    Thread.sleep(1000);
  }

and output

0   1510128693984
1   1510128694366 // 400ms
2   1510128694766 // 400ms
3   1510128695366 // 600ms
4   1510128697366 // 2000ms
5   1510128697766 // 400ms
6   1510128698567 // 800ms

Upvotes: 1

Related Questions