Reputation: 497
I am trying to create infinity-scroll in my Android application using backpressure in rx Java. I want it to call external service only requested number of times (after calling request(1)
). But after using flatmap every subscribe
loads 16 pages.
below my code with expected results. Almost every test fail because of first request (with n=16)
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;
public class ServiceObservablesTest {
public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
AtomicInteger pageNumber = new AtomicInteger();
subscriber.setProducer(n -> {
// at subscribe rxJava makes request for 16 elements - probably because of flatMap
// after first request with 16 elements everything seems to work fine even if i ignore the 'n' param
Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
subscriber.onNext(page);
});
});
return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
}
public interface DataProvider<T> {
Observable<List<T>> requestPage(int page);
}
private DataProvider provider;
@Before
public void setUp() throws Exception {
provider = Mockito.mock(DataProvider.class);
List<Object> list = Arrays.asList(new Object());
when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
}
@Test
public void shouldRequestOnlyFirstPageOnSubscribe() {
//given
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
//then
subscriber.assertValueCount(1);
subscriber.assertNotCompleted();
verify(provider, times(1)).requestPage(0);
verify(provider, never()).requestPage(1);
}
@Test
public void shouldRequestNumberOfPagesSpecified() {
//given
int requested_pages = 5;
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(requested_pages);
//then
subscriber.assertValueCount(requested_pages);
subscriber.assertNotCompleted();
for (int i = 0; i < requested_pages; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(requested_pages);
}
@Test
public void shouldCompleteAfterRetrievingEmptyResult() {
//given
int emptyPage = 2;
when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
//then
subscriber.assertValueCount(emptyPage);
subscriber.assertCompleted();
verify(provider, times(1)).requestPage(0); //requested at subscribe
for (int i = 1; i <= emptyPage; i++) {
verify(provider, times(1)).requestPage(i);
}
verify(provider, never()).requestPage(emptyPage + 1);
}
@Test
public void shouldRequestNextPageWhenRequestedMore() {
//given
TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber);
subscriber.requestMore(1);
//then
subscriber.assertValueCount(2);
verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);
//when
subscriber.requestMore(1);
//then
subscriber.assertValueCount(3);
subscriber.assertNotCompleted();
verify(provider, times(1)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, times(1)).requestPage(2);
verify(provider, never()).requestPage(3);
}
@Test
public void shouldWorkWithMultipleSubscribers() {
//given
TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
Observable<List<Object>> flightsObservable = create(provider);
//when
flightsObservable.subscribe(subscriber1);
flightsObservable.subscribe(subscriber2);
//then
subscriber1.assertValueCount(1);
subscriber2.assertValueCount(1);
verify(provider, times(2)).requestPage(0);
verify(provider, never()).requestPage(1);
//when
subscriber1.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(1);
verify(provider, times(2)).requestPage(0);
verify(provider, times(1)).requestPage(1);
verify(provider, never()).requestPage(2);
//when
subscriber2.requestMore(1);
//then
subscriber1.assertValueCount(2);
subscriber2.assertValueCount(2);
verify(provider, times(2)).requestPage(0);
verify(provider, times(2)).requestPage(1);
verify(provider, never()).requestPage(2);
}
}
Upvotes: 3
Views: 936
Reputation: 1009
Back pressure is intended to negotiate concurrent consumer producer behavior and allow the program author to set strategies for resolving what to do when the rate of data produced exceeds the rate of data consumed.
That said, you'll see that operators that combine observables such as merge
will give you a requested amount that doesn't correspond with the amount of data you require. The outer observable (an Observable of Observables) will always receive a request for 16 on RxAndroid (128 in RxJava) when merging. Then as it receives inner Observables of List each inner observable will receive a request that is based on the requested amount from the downstream subscriber. If you try to write an Observable<Observable<T>>
you will be forced to write an OnSubscribe<Observable<List<T>>>
function that internally manages the merging behavior so that it was an Observable<List<T>>
instead of Observable<Observable<List<T>>
. Writing this would force you to subscribe to the observable returned by your data provider to unwrap and onNext the List<T>
.
I suggest that you instead map screen y-positions into End-Of-Page events then use a scan to convert that into a monotonically increasing number and then concatMap that number into a call to DataProvider.requestPage()
.
screenYPositions
.map(this::isUninitializedOrNearEndOfPage)
.scan(1, (event, pageNumber) -> pageNumber + 1 )
.concatMap(dataProvider::requestPage)
.subscribe(testSubscriber);
Upvotes: 3