SmallSani
SmallSani

Reputation: 171

AsyncTask in RxJava

I can not understand how to translate a simple AsyncTask in RxJava. Take for example:

private class Sync extends AsyncTask<String, String, String> {

    @Override
    protected String doInBackground(String... params) {
              String proxy_arr = "";
                    try {
                        Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
                                .userAgent(Constants.USER_AGENT)
                                .ignoreContentType(true)
                                .ignoreHttpErrors(true)
                                .timeout(Constants.USER_TIMEOUT)
                                .get();

                        if (jsoup_proxy != null) proxy_arr = jsoup_proxy.text().trim();
                    } catch (IOException e) {
                        new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(e));
                    }
              return proxy_arr;
    }

    @Override
    protected void onPostExecute(String result) {
        if (result.equals("err_internet")){
            func.toastMessage(R.string.toast_err_nointernet, "", "alert");
        }

        reloadAdapter();
    }
}

As it can be translated in the same working condition RxJava? Thank you!

Upvotes: 4

Views: 1887

Answers (3)

When you convert a functionality to be reactive please keep in mind that you should define

  • onNext
  • onError
  • onCompleted

"events".

Actually rx is working well with data sequences, but of course you can create a data sequence with only one emitted item. So to modify your method to be reactive I'd say first you should decouple the responsibilities.

Somewhere in a repository class or you name it according to your architecture you just create this:

    public Observable<String> getProxyAsync() {
        return Observable.create(subscriber -> {
            String proxy_arr = "";
            try {
                Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
                    .userAgent(Constants.USER_AGENT)
                    .ignoreContentType(true)
                    .ignoreHttpErrors(true)
                    .timeout(Constants.USER_TIMEOUT)
                    .get();

                if (jsoup_proxy != null) proxy_arr = jsoup_proxy.text().trim();

                subscriber.onNext(proxy_arr);
            } catch (IOException e) {
                subscriber.onError(e);
            } finally {
                subscriber.onCompleted();
            }
        });
    }

after that, somewhere near to the activity, just subscribe to this method like this:

public void myPreciousMethod() {
    myCustomRepo.getProxyAsync()
        .subscribeOn(Schedulers.newThread())
        .subscribe(result -> {
            runOnUiThread(() -> {
                if (result.equals("err_internet")) {
                    func.toastMessage(R.string.toast_err_nointernet, "", "alert");
                }
            });
        }, throwable -> {
            // some exception happened emmited by your code, handle it well
            new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(e));
        }, () -> {
            // onCompleted:
            runOnUiThread(() -> reloadAdapter());
        });
}

I would suggest to use .runOnUiThread() (in your activity or any other view related operation with rx) to avoid backpressure, but it really depends on the amount and frequency of emitted data. (you can use .observeOn() and .subscribeOn() as well) Plus using the retrolambda is highly recommended too in sake of much more cleaner code.

Upvotes: 3

dimsuz
dimsuz

Reputation: 9207

Instead of using Observable.create you should use either Observable.defer() or better yet Observable.fromCallable (which was introduced in RxJava 1.0.15) - because these methods will ensure a proper observable contract and save you from some mistakes you can introduce when creating observable by hand.

Also instead of going with runOnUiThread as suggested in one of the answers above, you should really use AndroidSchedulers.mainThread() which was created for exactly this purpose. Just use RxAndroid library which provides it.

I suggest the following solution:

public Observable<String> getJsoupProxy() {
  return Observable.fromCallable(() -> {
      try {
        Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
          .userAgent(Constants.USER_AGENT)
          .ignoreContentType(true)
          .ignoreHttpErrors(true)
          .timeout(Constants.USER_TIMEOUT)
          .get();

        return jsoup_proxy != null ? jsoup_proxy.text().trim() : "";
      } catch (IOException e) {
        // just rethrow as RuntimeException to be caught in subscriber's onError
        throw new RuntimeException(e);
      }
    });
}

getJsoupProxy()
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread()) // this scheduler is exported by RxAndroid library
  .subscribe(
     proxy -> {
       if(proxy.equals("err_internet")) {
         // toast
       }
       reloadAdapter();
     },
     error -> new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(error)));

Upvotes: 3

asadmshah
asadmshah

Reputation: 1368

This is one way of doing it. You could forego the defer if need be.

        Observable.defer(new Func0<Observable<String>>() {
                @Override
                public Observable<String> call() {
                    return Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            String proxy_arr = "";
                            try {
                                Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
                                        .userAgent(Constants.USER_AGENT)
                                        .ignoreContentType(true)
                                        .ignoreHttpErrors(true)
                                        .timeout(Constants.USER_TIMEOUT)
                                        .get();

                                if (jsoup_proxy != null) proxy_arr = jsoup_proxy.text().trim();
                            } catch (IOException e) {
                                new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(e));
                            }
                            if (!subscriber.isUnsubscribed()) {
                                subscriber.onNext(proxy_arr);
                            }
                        }
                    })
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {
                @Override
                public void call(String result) {
                    if (result.equals("err_internet")){
                        func.toastMessage(R.string.toast_err_nointernet, "", "alert");
                    }
                    reloadAdapter();
                }
            });

Upvotes: 1

Related Questions