wasyl
wasyl

Reputation: 3516

Subscribe to existing observable unless it's completed

I've had the need for such a construct several times already, and I don't quite know how to approach it. My problem is: when A happens, I want to create a complex observable (made by combining few operators). It will asynchronously complete some action, publish the result and complete. Meanwhile I want to allow new subscriptions to this observable, but as soon as it has completed, new observable should be created, which is a copy of the first one (or just does the same thing).

(edit) As an example let's have a simple observable: Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS). I aim for the following behaviour:

[milliseconds: action]

0: obs.subscribe(...) - I would like this observable to complete after ~1s

500: obs.subscribe(...) - this one should complete after ~500ms

950: As above, should complete after 50ms

1500: Original observable should have already completed. I would like now to start everything anew, and have subscription here complete after 1s

2000: Here I would like to connect to connect to freshest observable and expect it to complete after 500s (because new second started counting at 1500)

I don't quite know how to do it in proper and thread-safe way. Can I do it with one observable?

Upvotes: 1

Views: 206

Answers (1)

akarnokd
akarnokd

Reputation: 70007

You can use defer and share to achieve this.

Observable<Long> o = Observable.defer(() ->
    Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS))
.share();

o.subscribe(System.out::println);   // T = 0
Thread.sleep(500);
o.subscribe(System.out::println);  // T = 500
Thread.sleep(450);
o.subscribe(System.out::println);   // T = 950

Thread.sleep(550);
o.subscribe(System.out::println);   // T = 1500
Thread.sleep(500);
o.subscribe(System.out::println);   // T == 2000

Thread.sleep(1000);

The first 3 will complete at the same time (with the same value) after 1s and the second two will complete 1.5s after the first ones (with different value to the first).

Upvotes: 2

Related Questions