miracle-doh
miracle-doh

Reputation: 596

How to run the function inside Observable.create everytime when it is called

I'm very new to RXJava. I have a function called politelyrefresh() that concats two observables together, but the functions in these two observables only run the first time I called politeRefresh, I'm not sure this is the right way to do it. What I want is run this function inside the observables everytime.

public void politelyRefresh() {
    Observable.concat(refreshStoreDataObservable, refreshProjectDataObservable)
            .finallyDo(()-> {
                try {     
                 //someother other long runnning-network requests

                } catch (Exception e) {
                    Log.e(TAG, "politelyRefresh finallyDo Error", e);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(reloadUiFromLocalStorageSubscriber);

}

//the other observable is pretty much the same but making another request

Observable<String> refreshStoreDataObservable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {

  //DOESN'T GET HERE SECOND TIME!

        Store.syncStores(new ListCallback() {
            @Override
            public void syncSuccess() {
                getSyncStateManager().setStoresRefreshed();
                subscriber.onCompleted();
            }

            @Override
            public void syncError() {

                subscriber.onError(new Throwable("SYNC STORES ERROR"));
                getSyncStateManager().setStoresSyncCompleted();
            }
        });

    }
});
Subscriber<String> reloadUiFromLocalStorageSubscriber = new Subscriber<String>() {

    @Override
    public void onCompleted() {
        if (mStoreRefreshLayout != null){
            mStoreRefreshLayout.setRefreshing(false);
        }
    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "reloadUiFromLocalStorageSubscriber: onError", e);
        if (mStoreRefreshLayout != null){
            mStoreRefreshLayout.setRefreshing(false);
        }
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "reloadUiFromLocalStorageSubscriber: onNext " + s);
    }
};

Upvotes: 1

Views: 1497

Answers (2)

miracle-doh
miracle-doh

Reputation: 596

I finally got this to work by move the subscriber from an class instance to inside the .subscribe() function(). I have no idea why this is happening.

Observable.concat(refreshStoreDataObservable, refreshProjectDataObservable)
        .finallyDo(()-> {
            try {     
             //someother other long runnning-network requests

            } catch (Exception e) {
                Log.e(TAG, "politelyRefresh finallyDo Error", e);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe( new Subscriber<String>() { /*rest of code  */}); //**here

Upvotes: 0

bkach
bkach

Reputation: 1441

I think you're looking for Observable.defer(). What this basically does is defer the creation of the Observable to when it is being subscribed to.

Here's a quick example:

public class Refresher {

    Refresher() {
        politelyRefresh();
        politelyRefresh();
    }

    public void politelyRefresh() {
        Observable.defer(() -> Observable.concat(refreshProjectData(), refreshStoreData()))
                .map(this::processData)
                .subscribe(this::printData);
    }

    private Observable<String> refreshStoreData() {
        System.out.println("StoreData Refreshed");
        return Observable.just("data1","data2","data3");
    }

    private Observable<String> refreshProjectData() {
        System.out.println("ProjectData Refreshed");
        return Observable.just("Project1","Project2", "Project3");
    }

    private String processData(String data) {
        return data + " processed";
    }

    private void printData(String data) {
        System.out.println(data);
    }

}

If you instantiate our refresher object, you'll get

StoreData Refreshed
StoreData Refreshed
Project1 processed
Project2 processed
Project3 processed
data1 processed
data2 processed
data3 processed
StoreData Refreshed
StoreData Refreshed
Project1 processed
Project2 processed
Project3 processed
data1 processed
data2 processed
data3 processed

If you'd like something to run on a different thread, you'd specify that on the specific observable you're looking to run on a non-ui thread.

So, for example, you might want to run the Observable in politelyRefresh on a background thread and subscribe to it on the UI thread. The creation of the other Observables will happen in a background thread too!

Upvotes: 1

Related Questions