Reputation: 3013
I want to create a method that accepts a S
and returns an Observable<T>
that is completed with one value after three (async) tasks are done.
The three tasks all run on Queues with their own consumers in separate threads.
Here's what I was thinking of:
<S, Subject<T>>
via the queue and computes the value from S
and sets in in the Subject<T>
with onNext
and then calls onComplete()
.<S, Subject<Void>>
and call onComplete()
once they've done their job. Task 1, 2 and 3 can run independently, and complete in random order.
Now I have three Subjects, One of T
, and two of Void
. I want to return the first one, but only let it emit the value T
once all tasks are done. This is because I don't want any subscribers of the observable do anything with T
until all tasks are completed.
What is the correct way to combine the Subjects to achieve this behavior? I can easily hack this together using CountDownLatch
etc but was hoping there's a rx-native kind of way to tackle this.
And is my plan to use Subjects as callbacks via the queue the right approach? I used to use CompletableFuture<T>
for this, but I want to migrate to RX.
Upvotes: 3
Views: 563
Reputation: 186
You could write your own Subject
that does this for you but that's typically discouraged for a number of reasons. Instead you can also concatenate the three Observables/Subjects produced by your tasks. This operation produces an Observable
which emits all values produced by these tasks, and only completes when all of the input Observables have completed.
Since their types slightly differ, you'll need to alter the signature of the Observables produced by the latter two tasks using map()
.
Observable<T> output = Observable.concat(t1,
t2.map(in -> null).ignoreElements(),
t3.map(in -> null).ignoreElements());
If you want to wait for any subscribers to use the produced value until all Observables have completed, you can call the last()
method on this Observable
.
Observable<T> output = Observable.concat(t1,
t2.map(in -> null).ignoreElements(),
t3.map(in -> null).ignoreElements()).last();
Upvotes: 0
Reputation: 12087
Don't use Subject
s for this. Rather merge the observables using asynchronous schedulers. So you have:
T task1(S s);
void task2(S s);
void task3(S s);
Then
<S,T> Observable<T> get(S s) {
return Observable.merge(
Observable.just(s)
.map(x -> task1(x))
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task2(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task3(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()))
// wait for completion before emitting the single value
.last();
}
Upvotes: 2
Reputation: 18663
I'm not entirely sure where the subject is coming into play but you can synchronize the three Tasks by using When
+ And
+ Then
public IObservable<T> MyMethod<S, T>(S incoming) {
//Create a new plan
return Observable.When(
//Start with Task one which will return an T from an S
Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))
//Add in Task two which returns a System.Reactive.Unit
.And(Observable.FromAsync(() => /*Do Task 2*/))
//Same for Task 3
.And(Observable.FromAsync(() => /*Do Task 3*/))
//Only emit the item from the first Task.
.Then((ret, _, __) => ret))
//Finally we only want this to process once, then we will reuse the
//existing value for subsequent subscribers
.PublishLast().RefCount();
}
The above will wait until all three items are completed before it emits. One thing to note is that in Rx the Void
object is really a System.Reactive.Unit
, so you should be returning that if there is no value.
Upvotes: 2