Reputation: 446
I have two Completable
s running on two different threads concurrently and accessing shared resources. I want to test their behavior so that only the first one runs certain piece of code and the second ends with error.
Since I am using Scheduler.trampoline()
in tests, the two Completable
s cannot run simultaneously, but run in sequence and I am unable to unit test that code.
Example,
val subscription = userRepository.logout().test()
val subscriptionSimultaneous = userRepository.logout().test()
subscription
.assertNoErrors()
.assertComplete()
subscriptionSimultaneous
.assertError(someError)
.assertNotComplete()
verify(exactly = 1) { somethingThatMustRunOnlyOnce() }
Upvotes: 1
Views: 396
Reputation: 3253
I actually took effort to implement such a test.
public class RxTest {
@Test
public void testConcurrency() {
Logout logout = new Logout();
AtomicInteger logoutCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);
Completable logoutCompletable = Completable.fromAction(() -> logout.logout())
.subscribeOn(Schedulers.io())
.doOnComplete(() -> logoutCount.addAndGet(1))
.doOnError(error -> errorCount.addAndGet(1))
.onErrorComplete();
int tries = 50;
Completable[] arrayOfLogoutCompletables = new Completable[tries];
for (int i = 0; i < tries; i++) {
arrayOfLogoutCompletables[i] = logoutCompletable;
}
// run all in parallel and wait for all to finish
Completable.mergeArray(arrayOfLogoutCompletables).blockingAwait();
assertEquals(1,logoutCount.get());
assertEquals(tries - 1, errorCount.get());
}
private static class Logout {
private boolean loggedOut = false;
/**
* if you remove synchronized test will fail!!
*/
private synchronized void logout() {
if (loggedOut) throw new IllegalStateException();
loggedOut = true;
}
}
}
The tests is running up to 50 Completables at the same time on Schedulers.io()
, each calling logout()
. There are counters that count how many times logout()
succeeds and fails. blockingAwait
is waiting for all Completables to finish.
Run this test 100 times and it will fail maybe 20% percent of the time if you remove synchronized
. onErrorComplete()
is there to avoid propagating exception before all Completables finish.
Fun fact: if you add getter and setter to loggedOut
and use it inside logout()
it will fail most of the time without synchronized
.
Hope it helps!
Upvotes: 1