Reputation: 824
My intention was to be able to pipeline a set amount of Completable however, it does not work as expected as seen from the following test case.
@Test
public void test_and_then() {
Completable c1 = Completable.fromAction(() -> {
System.out.println("<1>");
});
Completable c2 = Completable.fromAction(() -> {
System.out.println("<2.1>");
boolean test = true;
if (test) {
throw new Exception();
}
System.out.println("<2.2>");
});
Completable c3 = Completable.fromAction(() -> {
System.out.println("<3>");
});
Completable.complete()
.andThen(c1)
.andThen(c2)
.andThen(c3)
.test()
.assertError(t -> true)
.dispose();
c3.test().assertNotSubscribed();
}
This results in the following output.
<1>
<2.1>
<3>
java.lang.AssertionError: Subscribed! (latch = 0, values = 0, errors = 0, completions = 1)
As seen in the documentation of andThen it looks like it should not subscribe to any Completables added later if an error is detected in previous Completable. My problem is, why "<3>" is printed, basically, that Completable is run. How can I prevent this and simply end the chain of compietables instead? And why does it subscribe to the Completable c3 when the documentation indicates that this should not happen?
Upvotes: 0
Views: 234
Reputation: 17085
Actually, it works as you expected. However, you are subscribing to c3
:
c3.test().assertNotSubscribed();
This will subscribe to c3
and print <3>
.
If you remove that line, it'll work as you expect.
test()
subscribes and gives you a test observer that you can do assertions on. It's just unfortunate the output order - it makes you think the stream is not stopped.
Edit
Regarding the comment "what's the purpose of assertNotSubscribed
if it always fails".
This is quite a misconception to be honest. test()
is not the single way you have to create TestObserver
s. You can create a test observer like so
TestObserver<Void> testObserver = new TestObserver<>();
You can use assertNotSubscribed
to make sure you haven't subscribed the test observer yet. Actually, that method is supposed to check that onSubscribe
hasn't been called. It's not meant to test that dispose()
was called.
For this scenario, you'd have to do something like assertTrue(testObserver.isDisposed())
. You can follow the discussion here.
Upvotes: 1