Reputation: 171
I can not understand how to translate a simple AsyncTask in RxJava. Take for example:
private class Sync extends AsyncTask<String, String, String> {
@Override
protected String doInBackground(String... params) {
String proxy_arr = "";
try {
Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
.userAgent(Constants.USER_AGENT)
.ignoreContentType(true)
.ignoreHttpErrors(true)
.timeout(Constants.USER_TIMEOUT)
.get();
if (jsoup_proxy != null) proxy_arr = jsoup_proxy.text().trim();
} catch (IOException e) {
new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(e));
}
return proxy_arr;
}
@Override
protected void onPostExecute(String result) {
if (result.equals("err_internet")){
func.toastMessage(R.string.toast_err_nointernet, "", "alert");
}
reloadAdapter();
}
}
As it can be translated in the same working condition RxJava? Thank you!
Upvotes: 4
Views: 1887
Reputation: 61
When you convert a functionality to be reactive please keep in mind that you should define
"events".
Actually rx is working well with data sequences, but of course you can create a data sequence with only one emitted item. So to modify your method to be reactive I'd say first you should decouple the responsibilities.
Somewhere in a repository class or you name it according to your architecture you just create this:
public Observable<String> getProxyAsync() {
return Observable.create(subscriber -> {
String proxy_arr = "";
try {
Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
.userAgent(Constants.USER_AGENT)
.ignoreContentType(true)
.ignoreHttpErrors(true)
.timeout(Constants.USER_TIMEOUT)
.get();
if (jsoup_proxy != null) proxy_arr = jsoup_proxy.text().trim();
subscriber.onNext(proxy_arr);
} catch (IOException e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
after that, somewhere near to the activity, just subscribe to this method like this:
public void myPreciousMethod() {
myCustomRepo.getProxyAsync()
.subscribeOn(Schedulers.newThread())
.subscribe(result -> {
runOnUiThread(() -> {
if (result.equals("err_internet")) {
func.toastMessage(R.string.toast_err_nointernet, "", "alert");
}
});
}, throwable -> {
// some exception happened emmited by your code, handle it well
new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(e));
}, () -> {
// onCompleted:
runOnUiThread(() -> reloadAdapter());
});
}
I would suggest to use .runOnUiThread()
(in your activity or any other view related operation with rx) to avoid backpressure, but it really depends on the amount and frequency of emitted data. (you can use .observeOn()
and .subscribeOn()
as well) Plus using the retrolambda is highly recommended too in sake of much more cleaner code.
Upvotes: 3
Reputation: 9207
Instead of using Observable.create
you should use either Observable.defer()
or better yet Observable.fromCallable
(which was introduced in RxJava 1.0.15) - because these methods will ensure a proper observable contract and save you from some mistakes you can introduce when creating observable by hand.
Also instead of going with runOnUiThread
as suggested in one of the answers above, you should really use AndroidSchedulers.mainThread()
which was created for exactly this purpose. Just use RxAndroid library which provides it.
I suggest the following solution:
public Observable<String> getJsoupProxy() {
return Observable.fromCallable(() -> {
try {
Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
.userAgent(Constants.USER_AGENT)
.ignoreContentType(true)
.ignoreHttpErrors(true)
.timeout(Constants.USER_TIMEOUT)
.get();
return jsoup_proxy != null ? jsoup_proxy.text().trim() : "";
} catch (IOException e) {
// just rethrow as RuntimeException to be caught in subscriber's onError
throw new RuntimeException(e);
}
});
}
getJsoupProxy()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // this scheduler is exported by RxAndroid library
.subscribe(
proxy -> {
if(proxy.equals("err_internet")) {
// toast
}
reloadAdapter();
},
error -> new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(error)));
Upvotes: 3
Reputation: 1368
This is one way of doing it. You could forego the defer if need be.
Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String proxy_arr = "";
try {
Document jsoup_proxy = Jsoup.connect(Constants.SITE_PROXY_LIST)
.userAgent(Constants.USER_AGENT)
.ignoreContentType(true)
.ignoreHttpErrors(true)
.timeout(Constants.USER_TIMEOUT)
.get();
if (jsoup_proxy != null) proxy_arr = jsoup_proxy.text().trim();
} catch (IOException e) {
new DebugLog(getActivity(), "News", "Sync PROXY", Log.getStackTraceString(e));
}
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(proxy_arr);
}
}
})
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String result) {
if (result.equals("err_internet")){
func.toastMessage(R.string.toast_err_nointernet, "", "alert");
}
reloadAdapter();
}
});
Upvotes: 1