Kuanysh Raimbekov
Kuanysh Raimbekov

Reputation: 446

how to unit test concurrent rxjava execution

I have two Completables running on two different threads concurrently and accessing shared resources. I want to test their behavior so that only the first one runs certain piece of code and the second ends with error.

Since I am using Scheduler.trampoline() in tests, the two Completables cannot run simultaneously, but run in sequence and I am unable to unit test that code.

Example,

    val subscription = userRepository.logout().test()
    val subscriptionSimultaneous = userRepository.logout().test()


    subscription
        .assertNoErrors()
        .assertComplete()

    subscriptionSimultaneous
        .assertError(someError)
        .assertNotComplete()

    verify(exactly = 1) { somethingThatMustRunOnlyOnce() }

Upvotes: 1

Views: 396

Answers (1)

Tuby
Tuby

Reputation: 3253

I actually took effort to implement such a test.

public class RxTest {
    @Test
    public void testConcurrency() {
        Logout logout = new Logout();

        AtomicInteger logoutCount = new AtomicInteger(0);
        AtomicInteger errorCount = new AtomicInteger(0);

        Completable logoutCompletable = Completable.fromAction(() -> logout.logout())
                .subscribeOn(Schedulers.io())
                .doOnComplete(() -> logoutCount.addAndGet(1))
                .doOnError(error -> errorCount.addAndGet(1))
                .onErrorComplete();
        int tries = 50;
        Completable[] arrayOfLogoutCompletables = new Completable[tries];
        for (int i = 0; i < tries; i++) {
            arrayOfLogoutCompletables[i] = logoutCompletable;
        }
        // run all in parallel and wait for all to finish
        Completable.mergeArray(arrayOfLogoutCompletables).blockingAwait();

        assertEquals(1,logoutCount.get());
        assertEquals(tries - 1, errorCount.get());
    }


    private static class Logout {
        private boolean loggedOut = false;

        /**
         * if you remove synchronized test will fail!!
         */
        private synchronized void logout() {
            if (loggedOut) throw new IllegalStateException();
            loggedOut = true;
        }

    }
}

The tests is running up to 50 Completables at the same time on Schedulers.io(), each calling logout(). There are counters that count how many times logout() succeeds and fails. blockingAwait is waiting for all Completables to finish. Run this test 100 times and it will fail maybe 20% percent of the time if you remove synchronized. onErrorComplete() is there to avoid propagating exception before all Completables finish.

Fun fact: if you add getter and setter to loggedOut and use it inside logout() it will fail most of the time without synchronized.

Hope it helps!

Upvotes: 1

Related Questions