Reputation: 475
Okay, so I am new to RxJava2 (well, I don't know RxJava either) and am trying to develop an Android app using RxJava2 and MVP structure.
In that app, I am making async calls to a library that uses listeners. I set the listener using a "standard" setListener / registerListener method.
One of the method is returning values "realtime" -> I call the start() method of my library, and then will be notified on my listener at each modification of the list (when there is an add/remove of the items).
I don't really grasp how I can achieve this behavior using RxJava, as the listener is subscribed in the definition of the emitter / subscriber? Where should I declare the listener ? Where should I unsubscribe ? What object should I be using?
I started the dev using Nucleus, but can switch to another boilerplate or do one myself.
Here is some pseudo-code illustrating my question:
Before
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
...
mMyLib.setListener(this);
mMyLib.startDiscovery();
}
@Override
public void itemListChanged(List<Dummy> items) {
// update the UI with the list items
}
@Override
protected void onDestroy() {
super.onDestroy();
mMyLib.setListener(null);
}
Using Nucleus, in my presenter
Where should I unsubscribe if I want to receive modifications of the list as long as my activity / presenter are alive? Am I even using the right syntax/ objects?
private static final int REQUEST_ITEMS = 1;
private PublishSubject<Integer> pageRequests = PublishSubject.create();
...
@Override
public void onCreate(Bundle savedState) {
super.onCreate(savedState);
restartableReplay(REQUEST_ITEMS,
() -> Observable.create(e ->
{
mMyLib.setListener(new MyLib.Listener() {
@Override
public void itemListChanged(List<Dummy> items) {
Log.d(TAG, "meh itemListChanged");
e.onNext(items);
e.onComplete();
}
});
mMyLib.startDiscovery();
}
)
,
MainFragment::onItems,
MainFragment::onNetworkError);
}
void request() {
start(REQUEST_ITEMS);
}
Upvotes: 0
Views: 354
Reputation: 10267
You're in the right direction, this is a valid way to wrap the async callbacks with RxJava, few comments:
e.onComplete()
on itemListChanged
, this is wrong, as it will end your Observable
sequence, you might not need to call onComplete()
at all, as it is never ending notifications from outer source, or call it once upon real end of notifications (outer source finish to produce items), do not confuse it with unsubscribing from source.create()
, call e.setCancellable()
with your cancellation logic (mMyLib.setListener(null)
or any additional clean up resources code)share()
, to have a 'hot' Observable
that can multicast to several subscribers.restartableReplay
, will cache your'e Observable
and replay emitted items, this is might be wrong with this kind of 'hot' stream of events (unless you need to replay maybe last emission or something), additionally, cache will be problematic as you will lose unsubscription ability, so just be sure to use Nucleus right here.Where should I unsubscribe if I want to receive modifications of the list as long as my activity / presenter are alive? Am I even using the right syntax/ objects?
You simply need to unsubscribe wherever you want to stop getting notifications, whether it is at onStop()
or onDestory()
depends is up to you.
Upvotes: 1