David Liu
David Liu

Reputation: 9600

How to get notified of a observer's dispose action in a custom Observable in RxJava2

In this thread, a question is posed about how to observe the unsubscribe event so that you can clean up and remove the listener after it's unsubscribed. However, in RxJava2, the method that the above thread no longer works.

def myObservable = Observable.create({ aEmitter ->
    val listener = {event -> 
      aEmitter.onNext(event);                
    }
    existingEventSource.addListener(listener)

    // Fails since aEmitter doesn't have an add() method nor does Subscriptions exist.
    aEmitter.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})

What is the proper way of addressing this in RxJava2?

Upvotes: 0

Views: 656

Answers (1)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4012

Please look at the stringObservable Observable, how to handle subscriptions.

public class MyTest {
  @Mock private MyService mock;

  @Before
  public void setUp() throws Exception {
    MockitoAnnotations.initMocks(this);
  }

  @Test
  public void nam3e() {
    ArrayList<Listener> listeners = new ArrayList<>();

    doAnswer(
            invocation -> {
              Object[] args = invocation.getArguments();
              Listener arg = (Listener) args[0];

              listeners.add(arg);

              return null;
            })
        .when(mock)
        .addListener(any());

    Observable<String> stringObservable =
        Observable.create(
            e -> {
              Listener listener =
                  s -> {
                    e.onNext(s);
                  };

              mock.addListener(listener);

              e.setCancellable(
                  () -> {
                    mock.removeListener(listener);
                  });
            });

    TestObserver<String> test = stringObservable.test();

    Listener listener = listeners.get(0);
    listener.onNext("Wurst");

    test.assertNotComplete().assertValue("Wurst");
    verify(mock, times(1)).addListener(any());

    test.dispose();

    verify(mock, times(1)).removeListener(any());
  }

  public interface MyService {
    void addListener(Listener listener);

    void removeListener(Listener listener);
  }

  @FunctionalInterface
  public interface Listener {
    void onNext(String s);
  }
}

Upvotes: 2

Related Questions