Reputation: 6778
I'm trying to implement an Observable/Subscriber with RxJava
on the onPostExecute()
of an AsyncTask
and I don't get how to make the connection.
I create the Observable
in the onPostExecute
method. I want MyFragment
to subscribe to this. How do I set this up?
public class LoadAndStoreDataTask extends AsyncTask<String, Integer, String> {
...
@Override
protected void onPostExecute(String result) {
// create the observable
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(result);
subscriber.onCompleted();
}
}
);
myObservable.subscribe(mySubscriber);
}
}
public class MyFragment extends Fragment {
...
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
}
...
}
Upvotes: 2
Views: 1734
Reputation: 9050
Actually RxJava is supposed to replace AsycTask. In fact I can say with confidence that AsyncTask is a subset of RxJava.
In RxJava, a Subscriber would be analogous to AsyncTask.progressUpdate
or onPostExecute
and Observable to the process in doInBackground
. Data are emitted from Observable to Subscriber and any alteration in this stream is done with mapping methods. You probably don't need mapping now so I would reconfigure my RxJava like this:
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try{
String res = ...//your original doInBackground
subscriber.onNext(res);
// onNext would be comparable to AsyncTask.onProgressUpdate
// and usually applies when backgorund process runs a loop
subscriber.onCompleted();
}catch (SomeException e){
// if the process throws an exception or produces a result
// you'd consider error then use onError
subscriber.onError(e);
}
}
}
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // If subscriber runs on UI thread
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String response) {
// result from Observable.onNext. The methods below correspond
// to their Observable counterparts.
}
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
});
AndroidSchedulers
is available in RxAndroid. To use it add this line to build.gradle :
compile 'io.reactivex:rxandroid:0.24.0'
Upvotes: 4