Victor Grazi
Victor Grazi

Reputation: 16490

How do you unsubscribe from a live stream

I am trying to understand how to unsubscribe from an Observable that is created from a live feed. Here is more or less the code:

SomeFeed feed = new SomeFeed();
Observable<PriceTick> observable = Observable.create(s ->
  feed.register(new SomeListener() {
    @Override
    public void priceTick(PriceTick event) {
      s.onNext(event);
    }

    @Override
    public void error(Throwable throwable) {
      s.onError(throwable);
    }
  })
);
Subscription subscription = observable.subscribe(System.out::println);
subscription.unsubscribe();
System.out.println("Is unsubscribed:" + subscription.isUnsubscribed()); // prints true

I am finding that after the subscription is unsubscribed, the subscriber is still outputting the event stream.

How can I get the unsubscribe to remove the subscriber from receiving further notifications?

Upvotes: 0

Views: 438

Answers (1)

LordRaydenMK
LordRaydenMK

Reputation: 13321

The observable you are creating doesn't obey the Observable contract. You are not handling unsubscribtion and backpreasure. Observable.create makes it easy to create such observables.

Instead you can use other techniques to create your observables. You can create observables with the defer operator or with Observable.fromCallable.

For your code you can use:

Observable<PriceTick> observable = Observable.create(s ->
  feed.register(new SomeListener() {
    @Override
    public void priceTick(PriceTick event) {
      if(!s.isUnsbscribed()) {
        s.onNext(event);
      }
    }

    @Override
    public void error(Throwable throwable) {
      if(!s.isUnsbscribed()) {
        s.onError(throwable);
      }
    }
  })
);

This still doesn't handle backpreasure.

More about creating observables here.

Upvotes: 1

Related Questions