Shvalb
Shvalb

Reputation: 1933

Delay between iterated elements and on subscribe

FIXED to yield the desired output!

I have a list of items that I need to iterate and between each element I would like to wait 2 seconds. when all items are processed and subscribe is called then I would like to wait another 2 seconds.

Here is my code:

rx.Observable.from(new String[] {"Test1", "Test2", "Test3", "Test4"})
        .zipWith(rx.Observable.interval(2000, 1000,TimeUnit.MILLISECONDS), (a,b) -> a)
        .subscribe(name -> {
            System.out.println(name);
        }, e -> {
            System.out.println("ERROR" + e);
        }, () -> {
            rx.Observable.timer(4000, TimeUnit.MILLISECONDS)
                         .subscribe(notUsed -> "THE END!");
        });


        try {
            // Sleep so the program doesn't exit immediately
            Thread.sleep(10000);
        }
        catch (Exception e) {

        }

It runs pretty nice, but when it performs the 'interval' in the subscribe it prints "THE END!" three times !

TEST1
TEST2
TEST3
TEST4
THE END!
THE END!
THE END!

Any idea how to improve this?

Thank you!

Upvotes: 0

Views: 100

Answers (2)

RvanHeest
RvanHeest

Reputation: 869

Well, it does exactly what you tell it to do :-)

 time   event
-------------------------------------
    0   start, wait 2000 ms initially
 2000   emit "Test1", wait 1000 ms
 3000   emit "Test2", wait 1000 ms
 4000   emit "Test3", wait 1000 ms
 5000   emit "Test4", emit the OnCompleted event, wait 2000 ms
 7000   print "THE END!", wait 1000 ms
 8000   print "THE END!", wait 1000 ms
 9000   print "THE END!", wait 1000 ms
10000   main thread dies

I don't know exactly what you want, but I presume you want to have just a single "THE END!". If you want to have this to be delayed by 1000 ms as well, you have to do the following:

Observable.from(new String[] {"Test1", "Test2", "Test3", "Test4"})
    .zipWith(Observable.interval(2000, 1000,TimeUnit.MILLISECONDS), (a,b) -> a)
    .concatWith(Observable.timer(1000, TimeUnit.MILLISECONDS).ignoreElements())
    .subscribe(
        System.out::println,
        e -> System.out.println("ERROR" + e),
        () -> System.out.println("THE END!));

If you don't want the delay at the end, just remove the concat above.

What this concat does is it waits for the first sequence (from and zip) to complete and subscribe to the Observer once the OnCompleted from the first sequence is emitted. Once the second sequence is subscribed, it waits 1000 ms, emits an element and completes (because it is a timer rather than an interval). Because you ignore the elements (a.k.a. the OnNexts) you are only left with the OnCompleted event, which triggers the callback in the subscribe.

Upvotes: 2

m.ostroverkhov
m.ostroverkhov

Reputation: 1960

Once your first stream (Test1-4) is completed, you can concat it with Observable emitting single value after delay Observable.timer(2, TimeUnit.SECONDS).map(notUsed -> "The end")

Copy pastable code

rx.Observable.from(new String[]{"Test1", "Test2", "Test3", "Test4"})
                .zipWith(rx.Observable.interval(2000, 1000, TimeUnit.MILLISECONDS), (a, b) -> a)
                .concatWith(Observable.timer(2, TimeUnit.SECONDS).map(notUsed -> "The end"))
                .subscribe(name -> {
                            System.out.println(name);
                        }, e -> {
                            System.out.println("ERROR" + e);
                        }
                );


        try {
            // Sleep so the program doesn't exit immediately
            Thread.sleep(10000);
        } catch (Exception e) {

        }

Upvotes: 1

Related Questions