Reputation: 177
I saw this question here. It's about achieving delay for each emitted item. This is how to achieve it based on the accepted answer:
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
In the question, the asker specifically asked for a fixed set of observables (Observable.range(1,5)), unfortunately this is not what I want to achieve.
I also saw this comment.
That comment is what I want to achieve. So my source observable emits items at a slower (and sometimes faster) rate than the interval. Also the observable's emits are never ending.
===
So basically I want hot observables to have minimum delay.
For example, if I want 400ms minimum delay and I have this kind of observable emittance:
X1-100ms delay-X2-200ms delay-X3-600ms delay-X4-20000ms delay-X5-...
I want it to yield:
X1-400ms delay-X2-400ms delay-X3-600ms delay-X4-20000ms delay-X5-...
Anybody have any idea to achieve that?
Upvotes: 1
Views: 248
Reputation: 4681
Your requirement is so strange...
I can resolve it but not elegant. Here is my code:
class Three<A, B, C> {
A a;
B b;
C c;
// Getter, Setter, Constructor
}
public static void main(String[] args) throws Exception {
BehaviorSubject<Integer> s = BehaviorSubject.create();
// Three = (The value, upstream comes mills, downstream emits mills)
s.map(i -> new Three<>(i, System.currentTimeMillis(), System.currentTimeMillis()))
.scan((a, b) -> {
b.setC(a.getC() + Math.max(400L, b.getB() - a.getB()));
return b;
})
.concatMap(i -> Observable.just(i.getA()).delay(Math.max(0, i.getC() - System.currentTimeMillis()),
TimeUnit.MILLISECONDS))
.subscribe(i -> System.out.println(i + "\t" + System.currentTimeMillis()));
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(200);
s.onNext(2);
Thread.sleep(600);
s.onNext(3);
Thread.sleep(2000);
s.onNext(4);
Thread.sleep(200);
s.onNext(5);
Thread.sleep(800);
s.onNext(6);
Thread.sleep(1000);
}
and output
0 1510128693984
1 1510128694366 // 400ms
2 1510128694766 // 400ms
3 1510128695366 // 600ms
4 1510128697366 // 2000ms
5 1510128697766 // 400ms
6 1510128698567 // 800ms
Upvotes: 1