benjamin.donze
benjamin.donze

Reputation: 496

RxJava flatMap and backpressure strange behavior

While writing a data synchronization job with RxJava I discovered a strange behavior that I cannot explain. I'm quite novice with RxJava and would appreciate help.

Briefely my job is quite simple I have a list of element IDs, I call a webservice to get each element by ID, do some processing and do multiple call to push data to DB. Data loading is faster than data storing so I encounted OutOfMemory errors.

My code pretty much look like "failing" test but then doning some test I realized that removing the line :

flatMap(dt -> Observable.just(dt))

Make it work. Failing test output shows clearly that unconsumed items stack up and this lead to OutOfMemory. Working test output shows that producer will always wait consumer so this never lead to OutOfMemory.

public static class DataStore {
    public Integer myVal;
    public byte[] myBigData;

    public DataStore(Integer myVal) {
        this.myVal = myVal;
        this.myBigData = new byte[1000000];
    }
}

@Test
public void working() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

@Test
public void failing() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(dt -> Observable.just(dt))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

private Observable<DataStore> produce(final int value) {
    return Observable.<DataStore>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(200); //Here I synchronous call WS to retrieve data
                s.onNext(new DataStore(value));
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(DataStore value) {
    return Observable.<Boolean>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(1000); //Here I synchronous call DB to store data
                s.onNext(true);
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onNext(false);
            s.onCompleted();
        }
    }).subscribeOn(Schedulers.io());
}

What is explaination behind this behavior? How could I solve my failing test without removing the Observable.just(dt)) which in my real case is a Observable.from(someListOfItme)

Upvotes: 3

Views: 869

Answers (1)

akarnokd
akarnokd

Reputation: 69997

flatMap by default merges an unlimited amount of sources and by applying that specific lambda without maxConcurrent parameter, you essentially unbounded the upstream which now can run at full speed, overwhelming the internal buffers of the other operators.

Upvotes: 5

Related Questions