dmnk_89
dmnk_89

Reputation: 497

RxJava Backpressure and number of calls to producer

I am trying to create infinity-scroll in my Android application using backpressure in rx Java. I want it to call external service only requested number of times (after calling request(1)). But after using flatmap every subscribe loads 16 pages.

below my code with expected results. Almost every test fail because of first request (with n=16)

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;

public class ServiceObservablesTest {


    public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
        Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
            AtomicInteger pageNumber = new AtomicInteger();
            subscriber.setProducer(n -> {
                // at subscribe rxJava makes request for 16 elements - probably because of flatMap
                // after first request with 16 elements everything seems to work fine even if i ignore the 'n' param

                Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
                subscriber.onNext(page);

            });
        });
        return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
    }

    public interface DataProvider<T> {
        Observable<List<T>> requestPage(int page);
    }


    private DataProvider provider;

    @Before
    public void setUp() throws Exception {
        provider = Mockito.mock(DataProvider.class);
        List<Object> list = Arrays.asList(new Object());
        when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
    }

    @Test
    public void shouldRequestOnlyFirstPageOnSubscribe() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(1);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, never()).requestPage(1);
    }


    @Test
    public void shouldRequestNumberOfPagesSpecified() {
        //given

        int requested_pages = 5;
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(requested_pages);

        //then
        subscriber.assertValueCount(requested_pages);
        subscriber.assertNotCompleted();


        for (int i = 0; i < requested_pages; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(requested_pages);

    }


    @Test
    public void shouldCompleteAfterRetrievingEmptyResult() {
        //given

        int emptyPage = 2;
        when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
        Observable<List<Object>> flightsObservable = create(provider);


        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(emptyPage);
        subscriber.assertCompleted();


        verify(provider, times(1)).requestPage(0); //requested at subscribe
        for (int i = 1; i <= emptyPage; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(emptyPage + 1);

    }

    @Test
    public void shouldRequestNextPageWhenRequestedMore() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(2);
        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(3);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, times(1)).requestPage(2);
        verify(provider, never()).requestPage(3);

    }

    @Test
    public void shouldWorkWithMultipleSubscribers() {

        //given

        TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
        TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber1);
        flightsObservable.subscribe(subscriber2);

        //then
        subscriber1.assertValueCount(1);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, never()).requestPage(1);

        //when
        subscriber1.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber2.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(2);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(2)).requestPage(1);
        verify(provider, never()).requestPage(2);
    }

}

Upvotes: 3

Views: 936

Answers (1)

Aaron
Aaron

Reputation: 1009

Back pressure is intended to negotiate concurrent consumer producer behavior and allow the program author to set strategies for resolving what to do when the rate of data produced exceeds the rate of data consumed.

That said, you'll see that operators that combine observables such as merge will give you a requested amount that doesn't correspond with the amount of data you require. The outer observable (an Observable of Observables) will always receive a request for 16 on RxAndroid (128 in RxJava) when merging. Then as it receives inner Observables of List each inner observable will receive a request that is based on the requested amount from the downstream subscriber. If you try to write an Observable<Observable<T>> you will be forced to write an OnSubscribe<Observable<List<T>>> function that internally manages the merging behavior so that it was an Observable<List<T>> instead of Observable<Observable<List<T>>. Writing this would force you to subscribe to the observable returned by your data provider to unwrap and onNext the List<T>.

I suggest that you instead map screen y-positions into End-Of-Page events then use a scan to convert that into a monotonically increasing number and then concatMap that number into a call to DataProvider.requestPage().

screenYPositions
    .map(this::isUninitializedOrNearEndOfPage)
    .scan(1, (event, pageNumber) -> pageNumber + 1 )
    .concatMap(dataProvider::requestPage)
    .subscribe(testSubscriber);

Upvotes: 3

Related Questions