Al Lelopath
Al Lelopath

Reputation: 6778

Observable/Subscriber in AsyncTask

I'm trying to implement an Observable/Subscriber with RxJava on the onPostExecute() of an AsyncTask and I don't get how to make the connection. I create the Observable in the onPostExecute method. I want MyFragment to subscribe to this. How do I set this up?

public class LoadAndStoreDataTask extends AsyncTask<String,  Integer,  String> {
    ...

    @Override
    protected void onPostExecute(String result) {
         // create the observable    
        Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext(result);
                        subscriber.onCompleted();
                    }
                }
        );

        myObservable.subscribe(mySubscriber);
    }
}

public class MyFragment extends Fragment {

    ...

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        Subscriber<String> mySubscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) { System.out.println(s); }

            @Override
            public void onCompleted() { }

            @Override
            public void onError(Throwable e) { }
        };
    }
    ...
}

Upvotes: 2

Views: 1734

Answers (1)

inmyth
inmyth

Reputation: 9050

Actually RxJava is supposed to replace AsycTask. In fact I can say with confidence that AsyncTask is a subset of RxJava.

In RxJava, a Subscriber would be analogous to AsyncTask.progressUpdate or onPostExecute and Observable to the process in doInBackground. Data are emitted from Observable to Subscriber and any alteration in this stream is done with mapping methods. You probably don't need mapping now so I would reconfigure my RxJava like this:

 Observable<String> myObservable = Observable.create(
        new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {                      
               try{
                   String res = ...//your original doInBackground                       
                   subscriber.onNext(res); 
                   // onNext would be comparable to AsyncTask.onProgressUpdate 
                   // and usually applies when backgorund process runs a loop
                   subscriber.onCompleted();
               }catch (SomeException e){
                   // if the process throws an exception or produces a result 
                   // you'd consider error then use onError                         
                   subscriber.onError(e);
               }
            }
        }
 )
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread()) // If subscriber runs on UI thread
 .subscribe(new Subscriber<String>() {

        @Override
        public void onNext(String response) {
          // result from Observable.onNext. The methods below correspond
          // to their Observable counterparts. 
        }

        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}
 });

AndroidSchedulers is available in RxAndroid. To use it add this line to build.gradle :

compile 'io.reactivex:rxandroid:0.24.0'

Upvotes: 4

Related Questions