Reputation: 2848
I have an MVC application in which I have to update the view with the current value of a stream. In the model I have this method:
public Observable<Integer> getStreamInstance(){
if(stream == null){
this.stream = Observable.create((Subscriber<? super Integer> subscriber) -> {
new HeartbeatStream(frequence,subscriber).start();
});
}
return stream;
}
which I use in the controller to get the stream. Then, in the controller I have these two methods:
public void start(){
this.sb = stream.subscribe((Integer v) -> {
view.updateCurrValue(v);
});
}
public void stop(){
this.sb.unsubscribe();
}
With the start method I simply update a label in the view with the current value. This works fine until I try to stop the updating with the unsubscribing; infact, when I press the button "stop" in the view, the label keeps updating with the current value and, if I press "start" again, the label shows the values from two different streams, the one that I first created with the first "start" and the second that seems has been created with the second pressing of "start". Where am I wrong?
EDIT:
public class HeartbeatStream extends Thread{
private Subscriber<? super Integer> subscriber;
private int frequence;
private HeartbeatSensor sensor;
public HeartbeatStream(int freq, Subscriber<? super Integer> subscriber){
this.frequence = freq;
this.subscriber = subscriber;
sensor = new HeartbeatSensor();
}
public void run(){
while(true){
try {
subscriber.onNext(sensor.getCurrentValue());
Thread.sleep(frequence);
} catch (Exception e) {
subscriber.onError(e);
}
}
}
This is the HeartbeatStream class. HeartbeatSensor is a class that periodically generates a value that simulates the heartbeat frequence.
Upvotes: 0
Views: 227
Reputation: 13481
Observable by design unsubscribe your observer once that all items are emitted and onComplete callback is invoked.
Look this example https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java
Upvotes: 1
Reputation: 70017
I'm guessing you tried to periodically signal some event that triggers the screen update. There is an operator for that:
Observable<Long> timer = Observable.interval(period, TimeUnit.MILLISECONDS,
AndroidSchedulers.mainThread());
SerialSubscription serial = new SerialSubscription();
public void start() {
serial.set(timer.subscribe(v -> view.updateCurrValue(v)));
}
public void stop() {
serial.set(Subscriptions.unsubscribed());
}
public void onDestroy() {
serial.unsubscribe();
}
Upvotes: 2
Reputation: 6721
I guess you're not handling the unsubscribe
- although I can't see what's going on in your HeartbeatStream
class.
If you're creating an Observable
with Observable.create
then you need to handle unsubscribing explicitly with subscriber.isUnsubscribed()
.
Where possible use some of the utility methods to create an Observable - they handle this all for you eg Observable.just()
or Observable.from()
.
If this doesn't help, please post your HeartbeatStream class.
See the the docs for more details: https://github.com/ReactiveX/RxJava/wiki/Creating-Observables https://github.com/ReactiveX/RxJava/wiki/Async-Operators
Upvotes: 0