Reputation: 3516
I've had the need for such a construct several times already, and I don't quite know how to approach it. My problem is: when A happens, I want to create a complex observable (made by combining few operators). It will asynchronously complete some action, publish the result and complete. Meanwhile I want to allow new subscriptions to this observable, but as soon as it has completed, new observable should be created, which is a copy of the first one (or just does the same thing).
(edit) As an example let's have a simple observable: Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)
. I aim for the following behaviour:
[milliseconds: action]
0: obs.subscribe(...)
- I would like this observable to complete after ~1s
500: obs.subscribe(...)
- this one should complete after ~500ms
950: As above, should complete after 50ms
1500: Original observable should have already completed. I would like now to start everything anew, and have subscription here complete after 1s
2000: Here I would like to connect to connect to freshest observable and expect it to complete after 500s (because new second started counting at 1500)
I don't quite know how to do it in proper and thread-safe way. Can I do it with one observable?
Upvotes: 1
Views: 206
Reputation: 70007
You can use defer
and share
to achieve this.
Observable<Long> o = Observable.defer(() ->
Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS))
.share();
o.subscribe(System.out::println); // T = 0
Thread.sleep(500);
o.subscribe(System.out::println); // T = 500
Thread.sleep(450);
o.subscribe(System.out::println); // T = 950
Thread.sleep(550);
o.subscribe(System.out::println); // T = 1500
Thread.sleep(500);
o.subscribe(System.out::println); // T == 2000
Thread.sleep(1000);
The first 3 will complete at the same time (with the same value) after 1s and the second two will complete 1.5s after the first ones (with different value to the first).
Upvotes: 2