Alex
Alex

Reputation: 3013

Create one observable from several tasks

I want to create a method that accepts a S and returns an Observable<T> that is completed with one value after three (async) tasks are done.

The three tasks all run on Queues with their own consumers in separate threads.

Here's what I was thinking of:

Task 1, 2 and 3 can run independently, and complete in random order.

Now I have three Subjects, One of T, and two of Void. I want to return the first one, but only let it emit the value T once all tasks are done. This is because I don't want any subscribers of the observable do anything with T until all tasks are completed.

What is the correct way to combine the Subjects to achieve this behavior? I can easily hack this together using CountDownLatch etc but was hoping there's a rx-native kind of way to tackle this.

And is my plan to use Subjects as callbacks via the queue the right approach? I used to use CompletableFuture<T> for this, but I want to migrate to RX.

Upvotes: 3

Views: 563

Answers (3)

Michael de Jong
Michael de Jong

Reputation: 186

You could write your own Subject that does this for you but that's typically discouraged for a number of reasons. Instead you can also concatenate the three Observables/Subjects produced by your tasks. This operation produces an Observable which emits all values produced by these tasks, and only completes when all of the input Observables have completed.

Since their types slightly differ, you'll need to alter the signature of the Observables produced by the latter two tasks using map().

Observable<T> output = Observable.concat(t1, 
    t2.map(in -> null).ignoreElements(), 
    t3.map(in -> null).ignoreElements());

If you want to wait for any subscribers to use the produced value until all Observables have completed, you can call the last() method on this Observable.

Observable<T> output = Observable.concat(t1, 
    t2.map(in -> null).ignoreElements(), 
    t3.map(in -> null).ignoreElements()).last();

Upvotes: 0

Dave Moten
Dave Moten

Reputation: 12087

Don't use Subjects for this. Rather merge the observables using asynchronous schedulers. So you have:

T task1(S s);
void task2(S s);
void task3(S s);

Then

<S,T> Observable<T> get(S s) {
    return Observable.merge(
        Observable.just(s)
            .map(x -> task1(x))
            .subscribeOn(Schedulers.computation()),
        (Observable<T>) Observable.just(s)
            .doOnNext(x -> task2(x))
            .ignoreElements()
            .cast(Object.class)
            .subscribeOn(Schedulers.computation()),
        (Observable<T>) Observable.just(s)
            .doOnNext(x -> task3(x))
            .ignoreElements()
            .cast(Object.class)
            .subscribeOn(Schedulers.computation()))
        // wait for completion before emitting the single value
        .last();
}   

Upvotes: 2

paulpdaniels
paulpdaniels

Reputation: 18663

I'm not entirely sure where the subject is coming into play but you can synchronize the three Tasks by using When + And + Then

public IObservable<T> MyMethod<S, T>(S incoming) {

  //Create a new plan
  return Observable.When(

   //Start with Task one which will return an T from an S
   Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))

   //Add in Task two which returns a System.Reactive.Unit
   .And(Observable.FromAsync(() => /*Do Task 2*/))

   //Same for Task 3
   .And(Observable.FromAsync(() => /*Do Task 3*/))

   //Only emit the item from the first Task.
   .Then((ret, _, __) => ret))

   //Finally we only want this to process once, then we will reuse the
   //existing value for subsequent subscribers
   .PublishLast().RefCount();
}

The above will wait until all three items are completed before it emits. One thing to note is that in Rx the Void object is really a System.Reactive.Unit, so you should be returning that if there is no value.

Upvotes: 2

Related Questions