sdgfsdh
sdgfsdh

Reputation: 37045

How do I create an Observable after another has finished, and combine the results

I have:

I would like to sequence a and p.get() as follows:

This is what I have tried so far (ignoring f and g):

public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> p) {

    final Subject<T> subject = PublishSubject.create();

    a.subscribe(
        subject::onNext,
        subject::onError,
        () -> {
            p.get().subscribe(
                subject::onNext,
                subject::onError,
                subject::onComplete);
        });

    return subject;
}

How should I implement this?

Upvotes: 0

Views: 319

Answers (2)

sdgfsdh
sdgfsdh

Reputation: 37045

public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> f) {
    return a.publish(i -> Observable.merge(
        i, 
        i.lastOrError().flatMapObservable(f::apply));
}

Upvotes: 0

Alberto S.
Alberto S.

Reputation: 7649

Right now I don't have an IDE so I'm not sure that this code actually compiles. But the idea is this:

a                               // your first observable
    .map(f::apply)              // map first result to R
    .flatMap(r1 -> p.get()      // "concat" second observable
        .map(g::apply)          // map result result to R
        .map(r2 -> {
            // some kind of operation between r1 and r2
        })
    )
    .subscribe(next -> {
        // do something with value
    }, error -> {
        // error from either observable
    }, () -> {
        // completed!
    });

If the f computation is quite expensive and you only want to do it if second observable doesn't fail you can change it to

a                               // your first observable
    .flatMap(r1 -> p.get()      // "concat" second observable
        .map(g::apply)          // map result result to R
        .map(r2 -> {
            R valueFromFirstObservable = f.apply(r1);
            // some kind of operation between r1 and r2
        })
    )
    .subscribe(next -> {
        // do something with value
    }, error -> {
        // error from either observable
    }, () -> {
        // completed!
    });

If you need the first observable to completely finish before starting the second one bu you require all items for the first one you can use toList():

a                               // your first observable
    .map(f::apply)              // map first result to R
    .toList()                   // by converting to a List you are forcing the observable to finish before continuing
    .flatMap(r1Items -> p.get() // "concat" second observable
        .map(g::apply)          // map result result to R
        .toList()               // wait until p.get() finishes. Remove this line if you want to emit for all values
        .map(r2Items -> {
            // Some kind of operation between r1Items and r2Items 
            // Beware that now they are not of type R but List<R>
        })
    )
    .subscribe(next -> {
        // do something with value
    }, error -> {
        // error from either observable
    }, () -> {
        // completed!
    });

Upvotes: 1

Related Questions