Reputation: 37045
I have:
Observable<T1> a
Supplier<Observable<T2>> p
Function<T1, R> f
Function<T2, R> g
. I would like to sequence a
and p.get()
as follows:
a
to complete, then call Observable<T2> b = p.get()
a
and b
to type R
using f
and g
Observable<R>
a
or b
fails b
has finished. This is what I have tried so far (ignoring f
and g
):
public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> p) {
final Subject<T> subject = PublishSubject.create();
a.subscribe(
subject::onNext,
subject::onError,
() -> {
p.get().subscribe(
subject::onNext,
subject::onError,
subject::onComplete);
});
return subject;
}
How should I implement this?
Upvotes: 0
Views: 319
Reputation: 37045
public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> f) {
return a.publish(i -> Observable.merge(
i,
i.lastOrError().flatMapObservable(f::apply));
}
Upvotes: 0
Reputation: 7649
Right now I don't have an IDE so I'm not sure that this code actually compiles. But the idea is this:
a // your first observable
.map(f::apply) // map first result to R
.flatMap(r1 -> p.get() // "concat" second observable
.map(g::apply) // map result result to R
.map(r2 -> {
// some kind of operation between r1 and r2
})
)
.subscribe(next -> {
// do something with value
}, error -> {
// error from either observable
}, () -> {
// completed!
});
If the f
computation is quite expensive and you only want to do it if second observable doesn't fail you can change it to
a // your first observable
.flatMap(r1 -> p.get() // "concat" second observable
.map(g::apply) // map result result to R
.map(r2 -> {
R valueFromFirstObservable = f.apply(r1);
// some kind of operation between r1 and r2
})
)
.subscribe(next -> {
// do something with value
}, error -> {
// error from either observable
}, () -> {
// completed!
});
If you need the first observable to completely finish before starting the second one bu you require all items for the first one you can use toList()
:
a // your first observable
.map(f::apply) // map first result to R
.toList() // by converting to a List you are forcing the observable to finish before continuing
.flatMap(r1Items -> p.get() // "concat" second observable
.map(g::apply) // map result result to R
.toList() // wait until p.get() finishes. Remove this line if you want to emit for all values
.map(r2Items -> {
// Some kind of operation between r1Items and r2Items
// Beware that now they are not of type R but List<R>
})
)
.subscribe(next -> {
// do something with value
}, error -> {
// error from either observable
}, () -> {
// completed!
});
Upvotes: 1