Usr
Usr

Reputation: 2848

RxJava subscription does not unsubscribe correctly

I have an MVC application in which I have to update the view with the current value of a stream. In the model I have this method:

public Observable<Integer> getStreamInstance(){
    if(stream == null){
        this.stream = Observable.create((Subscriber<? super Integer> subscriber) -> {
            new HeartbeatStream(frequence,subscriber).start();
        }); 
    }

    return stream;
}

which I use in the controller to get the stream. Then, in the controller I have these two methods:

public void start(){
    this.sb = stream.subscribe((Integer v) -> {
        view.updateCurrValue(v);
    });
}

public void stop(){
    this.sb.unsubscribe();      
}

With the start method I simply update a label in the view with the current value. This works fine until I try to stop the updating with the unsubscribing; infact, when I press the button "stop" in the view, the label keeps updating with the current value and, if I press "start" again, the label shows the values from two different streams, the one that I first created with the first "start" and the second that seems has been created with the second pressing of "start". Where am I wrong?

EDIT:

public class HeartbeatStream extends Thread{

private Subscriber<? super Integer> subscriber;
private int frequence;
private HeartbeatSensor sensor;

public HeartbeatStream(int freq, Subscriber<? super Integer> subscriber){
    this.frequence = freq;
    this.subscriber = subscriber;
    sensor = new HeartbeatSensor();
}

public void run(){

    while(true){        
        try {
            subscriber.onNext(sensor.getCurrentValue());
            Thread.sleep(frequence);
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }


}

This is the HeartbeatStream class. HeartbeatSensor is a class that periodically generates a value that simulates the heartbeat frequence.

Upvotes: 0

Views: 227

Answers (3)

paul
paul

Reputation: 13481

Observable by design unsubscribe your observer once that all items are emitted and onComplete callback is invoked.

Look this example https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java

Upvotes: 1

akarnokd
akarnokd

Reputation: 70017

I'm guessing you tried to periodically signal some event that triggers the screen update. There is an operator for that:

Observable<Long> timer = Observable.interval(period, TimeUnit.MILLISECONDS,
     AndroidSchedulers.mainThread());

SerialSubscription serial = new SerialSubscription();

public void start() {
    serial.set(timer.subscribe(v -> view.updateCurrValue(v)));
}

public void stop() {
    serial.set(Subscriptions.unsubscribed());
}

public void onDestroy() {
    serial.unsubscribe();
}

Upvotes: 2

Will
Will

Reputation: 6721

I guess you're not handling the unsubscribe - although I can't see what's going on in your HeartbeatStream class.

If you're creating an Observable with Observable.create then you need to handle unsubscribing explicitly with subscriber.isUnsubscribed().

Where possible use some of the utility methods to create an Observable - they handle this all for you eg Observable.just() or Observable.from().

If this doesn't help, please post your HeartbeatStream class.

See the the docs for more details: https://github.com/ReactiveX/RxJava/wiki/Creating-Observables https://github.com/ReactiveX/RxJava/wiki/Async-Operators

Upvotes: 0

Related Questions