Reputation: 9600
In this thread, a question is posed about how to observe the unsubscribe event so that you can clean up and remove the listener after it's unsubscribed. However, in RxJava2, the method that the above thread no longer works.
def myObservable = Observable.create({ aEmitter ->
val listener = {event ->
aEmitter.onNext(event);
}
existingEventSource.addListener(listener)
// Fails since aEmitter doesn't have an add() method nor does Subscriptions exist.
aEmitter.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})
What is the proper way of addressing this in RxJava2?
Upvotes: 0
Views: 656
Reputation: 4012
Please look at the stringObservable Observable, how to handle subscriptions.
public class MyTest {
@Mock private MyService mock;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void nam3e() {
ArrayList<Listener> listeners = new ArrayList<>();
doAnswer(
invocation -> {
Object[] args = invocation.getArguments();
Listener arg = (Listener) args[0];
listeners.add(arg);
return null;
})
.when(mock)
.addListener(any());
Observable<String> stringObservable =
Observable.create(
e -> {
Listener listener =
s -> {
e.onNext(s);
};
mock.addListener(listener);
e.setCancellable(
() -> {
mock.removeListener(listener);
});
});
TestObserver<String> test = stringObservable.test();
Listener listener = listeners.get(0);
listener.onNext("Wurst");
test.assertNotComplete().assertValue("Wurst");
verify(mock, times(1)).addListener(any());
test.dispose();
verify(mock, times(1)).removeListener(any());
}
public interface MyService {
void addListener(Listener listener);
void removeListener(Listener listener);
}
@FunctionalInterface
public interface Listener {
void onNext(String s);
}
}
Upvotes: 2