Reputation: 16490
I am trying to understand how to unsubscribe from an Observable that is created from a live feed. Here is more or less the code:
SomeFeed feed = new SomeFeed();
Observable<PriceTick> observable = Observable.create(s ->
feed.register(new SomeListener() {
@Override
public void priceTick(PriceTick event) {
s.onNext(event);
}
@Override
public void error(Throwable throwable) {
s.onError(throwable);
}
})
);
Subscription subscription = observable.subscribe(System.out::println);
subscription.unsubscribe();
System.out.println("Is unsubscribed:" + subscription.isUnsubscribed()); // prints true
I am finding that after the subscription is unsubscribed, the subscriber is still outputting the event stream.
How can I get the unsubscribe to remove the subscriber from receiving further notifications?
Upvotes: 0
Views: 438
Reputation: 13321
The observable you are creating doesn't obey the Observable contract. You are not handling unsubscribtion and backpreasure. Observable.create
makes it easy to create such observables.
Instead you can use other techniques to create your observables. You can create observables with the defer
operator or with Observable.fromCallable
.
For your code you can use:
Observable<PriceTick> observable = Observable.create(s ->
feed.register(new SomeListener() {
@Override
public void priceTick(PriceTick event) {
if(!s.isUnsbscribed()) {
s.onNext(event);
}
}
@Override
public void error(Throwable throwable) {
if(!s.isUnsbscribed()) {
s.onError(throwable);
}
}
})
);
This still doesn't handle backpreasure.
More about creating observables here.
Upvotes: 1