Cheng
Cheng

Reputation: 785

Server polling using RxJava - repeat after geting server response

Currently I am trying to implement server polling using RxJava, I have done some research on how to repeat the whole chain after receiving server response, I have tried with repeat(), it works but does not work that perfect, the reason is that it makes api call so many times and server need extra time to process the data before sending to the client, but we dont know exact time, so we cannot use repeatWhen() to give specific time. The only thing I can use is to wait after api response.

Any advice will be appreciated!

The following is code snippet:

retrofitService.requestPolling()
              .repeat()   // do not wait to call server so many times
              .takeUntil(new Func1<PollResponse, Boolean>() {
                      @Override
                      public Boolean call(PollResponse pollResponse) {
                            return pollResponse.mComplete;
                       }
               })
             .doOnNext(new Action1<FlightSearchPollResponse>() {
                @Override
                public void call(pollResponse pollResponse) {
                      // update UI here
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<PollResponse>() {
                @Override
                public void onCompleted() {

                    }
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(PollResponse pollResponse) {

            } );

EDIT: I am new to RxJava, just got this topic called BackPressure, and there are lots of articles explained how to deal with it, since i dont want to cache this response, seems Subject would be a good option, it allows you to control when to pull.

http://akarnokd.blogspot.com/2015/06/subjects-part-1.html

And thanks to @Gary LO

Upvotes: 1

Views: 1053

Answers (1)

Gary LO
Gary LO

Reputation: 1033

There should be many approaches. I would like to share one of those.

  1. Create a separate signal stream PublishSubject pollingSignal
  2. Transform the signal to an api call
  3. Publish a signal to do it again.

    final PublishSubject<Boolean> pollingSignal = PublishSubject.create();
    
    final Observable<PollResponse> apiResponse = retrofitService.requestPolling();
    
    pollingSignal
      .flatMap(x -> apiResponse)
      .subscribe(new Observer<PollResponse>() {
        @Override
        public void onCompleted() {}
    
        @Override
        public void onError(Throwable throwable) {}
    
        @Override
        public void onNext(PollResponse integer) {
          // start the next polling 
          pollingSignal.onNext(true);
      }
    });
    
    // start the first polling 
    pollingSignal.onNext(true);
    

Have fun!

Notes Using PublishSubject<Boolean> instead of PublishSubject<Void> is because I don't feel comfortable to use pollingSignal.onNext(null).

But in Kotlin, I can use PublishSubject<Unit> with pollingSignal.onNext(Unit)

Upvotes: 1

Related Questions