Reputation: 18120
I have a bunch of observables that I'm running/subscribing to in a method (about 9 to be exact), that itself returns an observable. I narrowed it down to 2 observables for the purpose of this question. Here is my method that I created that returns an Observable that contains the other observables.
public static Observable<MyCustomObject> runAll(Service networkService) {
return Observable.create(subscriber -> {
networkService.getOne().subscribe(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
subscriber.onNext(case);
}, exception -> {
throw new OnErrorNotImplementedException(exception);
});
networkService.getTwo().subscribe(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
subscriber.onNext(case);
}, exception -> {
throw new OnErrorNotImplementedException(exception);
});
subscriber.onComplete();
});
}
I then use the Observable that's returned...
runAll(networkService)
.subscribeOn(Schedulers.io())
.subscribe(case -> {
//do stuff
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//handle error
}
});
I'm not sure if I'm creating an observable correctly. I'm essentially replacing a listener/interface that I had here before with an Observable, but I know using Observable.create() is something that is not easy. Should I do it another way? Is there anything wrong with doing what I'm doing?
EDIT: getOne() and getTwo() are network calls so they return Observables.
EDIT 2: Currently have this
public static Observable<MyCustomObject> quickRun(Service networkService) {
return Observable.concat(
networkService.getOne().map(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
return case;
}),
networkService.getTwo().map(response -> {
Request request = response.raw().request();
MyCustomObject case = new MyCustomObject(request);
return case;
})
);
}
Upvotes: 0
Views: 1812
Reputation: 10267
Yes, you should do it in a different way.
Whenever you're treating Observable like regular async callback this is a smell that you are not acting 'reactive'-ly.
As you said Observable.create()
is non-trivial way with some pitfalls for creating Observable, that being said, you probably use older version of RxJava 1, with newer version (1.3+ I think), and with RxJava2, create is based on emitter and is safer. you can read here about the pitfalls of Create and the emitter approach. (as a side note with RxJava2 has another way of extending Observable
).
All of those ways are to bridge between the async callback world to Reactive world, and wrap it any kind of async operation with Observable
.
As for your case, as you have already Observables at your hand, what you need is to compose them together to single stream - Observable.
Rx has many operators for this purpose, according to your example Observable.merge
seems the appropriate operator, all your Observable
will run in asynchronously (note, you will need to apply to each of them the IO scheduler for that), and each Observable
will emit its result on the merged stream, when all of the Observable will finish - onCompleted will be called on the merged Observable
, which is by the way wrong at your example as it's called at the very start just after you've been fire all tasks.
Observable.merge(
networkService.getOne()
.subscribeOn(Schedulers.io()),
networkService.getTwo()
.subscribeOn(Schedulers.io())
)
.map(response -> {
Request request = response.raw().request();
return new MyCustomObject(request);
})
.subscribe(customObject -> {
//do stuff
}, throwable -> {
//handle error
}
);
Upvotes: 1