w00ly
w00ly

Reputation: 475

RxJava and long running listener

Okay, so I am new to RxJava2 (well, I don't know RxJava either) and am trying to develop an Android app using RxJava2 and MVP structure.

In that app, I am making async calls to a library that uses listeners. I set the listener using a "standard" setListener / registerListener method.

One of the method is returning values "realtime" -> I call the start() method of my library, and then will be notified on my listener at each modification of the list (when there is an add/remove of the items).

I don't really grasp how I can achieve this behavior using RxJava, as the listener is subscribed in the definition of the emitter / subscriber? Where should I declare the listener ? Where should I unsubscribe ? What object should I be using?

I started the dev using Nucleus, but can switch to another boilerplate or do one myself.

Here is some pseudo-code illustrating my question:

Before

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    ...

    mMyLib.setListener(this);
    mMyLib.startDiscovery();
}

@Override
public void itemListChanged(List<Dummy> items) {
    // update the UI with the list items
}

@Override
protected void onDestroy() {
    super.onDestroy();
    mMyLib.setListener(null);
}

Using Nucleus, in my presenter

Where should I unsubscribe if I want to receive modifications of the list as long as my activity / presenter are alive? Am I even using the right syntax/ objects?

private static final int REQUEST_ITEMS = 1;
private PublishSubject<Integer> pageRequests = PublishSubject.create();

...

@Override
public void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableReplay(REQUEST_ITEMS,
            () -> Observable.create(e ->
                    {
                        mMyLib.setListener(new MyLib.Listener() {
                            @Override
                            public void itemListChanged(List<Dummy> items) {
                                Log.d(TAG, "meh itemListChanged");
                                e.onNext(items);
                                e.onComplete();
                            }
                        });
                        mMyLib.startDiscovery();
                    }
            )
            ,
            MainFragment::onItems,
            MainFragment::onNetworkError);
}

void request() {
    start(REQUEST_ITEMS);
}

Upvotes: 0

Views: 354

Answers (1)

yosriz
yosriz

Reputation: 10267

You're in the right direction, this is a valid way to wrap the async callbacks with RxJava, few comments:

  • Your'e calling e.onComplete() on itemListChanged, this is wrong, as it will end your Observable sequence, you might not need to call onComplete() at all, as it is never ending notifications from outer source, or call it once upon real end of notifications (outer source finish to produce items), do not confuse it with unsubscribing from source.
  • For unsubscription logic to do something you should define it in your create(), call e.setCancellable() with your cancellation logic (mMyLib.setListener(null) or any additional clean up resources code)
  • it's seems in this case that you have only 1 subscriber, but otherwise consider to use share(), to have a 'hot' Observable that can multicast to several subscribers.
  • as for Nucleus library, as far as I remember restartableReplay, will cache your'e Observable and replay emitted items, this is might be wrong with this kind of 'hot' stream of events (unless you need to replay maybe last emission or something), additionally, cache will be problematic as you will lose unsubscription ability, so just be sure to use Nucleus right here.

Where should I unsubscribe if I want to receive modifications of the list as long as my activity / presenter are alive? Am I even using the right syntax/ objects?

You simply need to unsubscribe wherever you want to stop getting notifications, whether it is at onStop() or onDestory() depends is up to you.

Upvotes: 1

Related Questions