Reputation: 1933
FIXED to yield the desired output!
I have a list of items that I need to iterate and between each element I would like to wait 2 seconds. when all items are processed and subscribe is called then I would like to wait another 2 seconds.
Here is my code:
rx.Observable.from(new String[] {"Test1", "Test2", "Test3", "Test4"})
.zipWith(rx.Observable.interval(2000, 1000,TimeUnit.MILLISECONDS), (a,b) -> a)
.subscribe(name -> {
System.out.println(name);
}, e -> {
System.out.println("ERROR" + e);
}, () -> {
rx.Observable.timer(4000, TimeUnit.MILLISECONDS)
.subscribe(notUsed -> "THE END!");
});
try {
// Sleep so the program doesn't exit immediately
Thread.sleep(10000);
}
catch (Exception e) {
}
It runs pretty nice, but when it performs the 'interval' in the subscribe it prints "THE END!" three times !
TEST1
TEST2
TEST3
TEST4
THE END!
THE END!
THE END!
Any idea how to improve this?
Thank you!
Upvotes: 0
Views: 100
Reputation: 869
Well, it does exactly what you tell it to do :-)
time event
-------------------------------------
0 start, wait 2000 ms initially
2000 emit "Test1", wait 1000 ms
3000 emit "Test2", wait 1000 ms
4000 emit "Test3", wait 1000 ms
5000 emit "Test4", emit the OnCompleted event, wait 2000 ms
7000 print "THE END!", wait 1000 ms
8000 print "THE END!", wait 1000 ms
9000 print "THE END!", wait 1000 ms
10000 main thread dies
I don't know exactly what you want, but I presume you want to have just a single "THE END!". If you want to have this to be delayed by 1000 ms as well, you have to do the following:
Observable.from(new String[] {"Test1", "Test2", "Test3", "Test4"})
.zipWith(Observable.interval(2000, 1000,TimeUnit.MILLISECONDS), (a,b) -> a)
.concatWith(Observable.timer(1000, TimeUnit.MILLISECONDS).ignoreElements())
.subscribe(
System.out::println,
e -> System.out.println("ERROR" + e),
() -> System.out.println("THE END!));
If you don't want the delay at the end, just remove the concat
above.
What this concat
does is it waits for the first sequence (from
and zip
) to complete and subscribe to the Observer
once the OnCompleted
from the first sequence is emitted. Once the second sequence is subscribed, it waits 1000 ms, emits an element and completes (because it is a timer
rather than an interval
). Because you ignore the elements (a.k.a. the OnNext
s) you are only left with the OnCompleted
event, which triggers the callback in the subscribe
.
Upvotes: 2
Reputation: 1960
Once your first stream (Test1-4) is completed, you can concat it with Observable
emitting single value after delay Observable.timer(2, TimeUnit.SECONDS).map(notUsed -> "The end")
Copy pastable code
rx.Observable.from(new String[]{"Test1", "Test2", "Test3", "Test4"})
.zipWith(rx.Observable.interval(2000, 1000, TimeUnit.MILLISECONDS), (a, b) -> a)
.concatWith(Observable.timer(2, TimeUnit.SECONDS).map(notUsed -> "The end"))
.subscribe(name -> {
System.out.println(name);
}, e -> {
System.out.println("ERROR" + e);
}
);
try {
// Sleep so the program doesn't exit immediately
Thread.sleep(10000);
} catch (Exception e) {
}
Upvotes: 1