Reputation: 45
I m new to RxJava and I m trying to use RxJava as an event bus. I m using RxJava example by Kaushik Gopal (https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java).
Here is my RxBus code
public class RxBus {
private static RxBus _instance;
private final Subject<Object, Object> _bus;
public static synchronized RxBus getInstance(){
if(_instance == null){
_instance = new RxBus();
}
return _instance;
}
private RxBus() {
_bus = new SerializedSubject<>(PublishSubject.create());
}
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
return _bus;
}
public boolean hasObservers() {
return _bus.hasObservers();
}
}
I m posting on a different thread but I m subscribing on a UI thread. Here is my subscribe code
public abstract class BaseActivity extends AppCompatActivity {
protected CompositeSubscription _subscriptions;
@Override
protected void onCreate(Bundle savedInstanceState) {
...
_subscriptions = new CompositeSubscription();
...
}
@Override
protected void onResume() {
....
_subscriptions.add(RxBus.getInstance().toObserverable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(final Object event) {
//Do something here
}
}));
.....
}
@Override
protected void onPause() {
......
_subscriptions.unsubscribe();
}
}
When I first load the activity I do receive the events. But if I close and open the activity again (triggering onPause and onResume) I stop getting the events. Am I missing anything here ? Is this the right way to use RxJava for event bus ? Again I m posting on a different thread but I m subscribing on a UI thread. Thanks
Upvotes: 1
Views: 1374
Reputation: 45
I figured out the issue. Had to call _subscriptions.clear() instead of _subscriptions.unsubscribe();
Upvotes: 2