SLearner
SLearner

Reputation: 1439

Make Single Observable emit more than once

When I receive a push notification, I add the notification payload to my DB in the following way:

personObject.insertObjectIntoDb(searchResult, value, oneOnOneChannel).observeOn(Schedulers.computation()).subscribe(insertSinglePersonSubscriber);

I have a Subscriber instantiated in the onCreate() of my activity.

insertSub = new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.d(TAG, "onCompleted: insertSub complete");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: insertSub error");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "onNext: insertSub next");
    }
};

On the first invocation of personObject.insertObjectIntoDb(), the onComplete() of insertSub is called, just as expected.

This would mean the now the subscription has ended and the subscriber gets unsubscribed.

However, I don't want to stop the subscription and want the insertSub callbacks to be called every time there's a new push notification.

I've been reading about share() for one observable and multiple subscribers but that's not what I want. I read about replay() but it the observable never stops emitting (oops). Changing Subscriber to Observer also didn't help and on more thorough reading I found that Subscriber inherits from Observer and the same rules apply (apart from the fact the with a subscriber we need to unsubscribe.

I am not sure if the observer observable stops emitting (after emitting once). How do I make the observable emit multiple times, whenever there's a new notification?

Also, what's the best way to re-establish the subscription?

Upvotes: 2

Views: 3132

Answers (3)

koperko
koperko

Reputation: 2487

First of all, according your description it seems that you don't have some basic concepts completely figured out.

Observers don't emit but rather receive emissions.

Singles can't emit more than once. They were designed to emit only one event and complete. If you don't want this kind of behavior, you need to use some other Observable type.

But Single's are not a problem here. To compose a stream that behaves like this, you need to think one level above. If you design a stream that receives signals from push notifications, you can react to each of them by subscribing to your Single and forward its emission back to the main stream of notifications. That way, you funnel all your emissions into one Observer, just like you described. This can be easily achieved with flatMap operator.

notificationStream
    .flatMap(notificationPayload -> 
         personObject
             .insertObjectIntoDb(/* Extract arguments from payload */)
             .subscribeOn(Schedulers.computation())
    )
    .subscribe(insertSinglePersonSubscriber)

notificationStream can be created either by using some library designed for it ( i.e. https://android-arsenal.com/details/1/3546 ), writing it yourself or using a Subject. The easiest way is definitely third one, although not cleanest. You just create a subject and call its onNext method right in the place where you subscribe to insert object Single right now.

When composing the stream to insert a value, I changed your observeOn operator to subscribeOn as I guess that you don't completely understand what each of those operators do.

The observeOn operator switches the scheduler on which are emissions handled from that point of the stream.

On the other hand, subscribeOn operator instructs observable to produce items on a particular scheduler. As I hope I can assume that you do the "heavy lifting" in the producing the emission in the Single itself -- that would be the insert itself -- you need to subscribe on a computation scheduler, not observe on it. If I am wrong about this and you need to do computation work in the observer, then add observeOn operator after flatMap on the notification stream.

RxJava is awesome, keep learning. But theoretical knowledge is important :-)

Upvotes: 4

SLearner
SLearner

Reputation: 1439

Answering this as I found a solution to the overall problem.

Basically, I wanted to make a notificationStream as @koperko mentioned in his answer. But he suggested creating a Subject (PublishSubject if I am not wrong).

But that solved only half of my problem. What I wanted to do was to take have a notification Stream that adds the notification to my DB and after it's inserted, update the UI by fetching the last inserted notification.

personObject.insertObjectIntoDb(person)
    .observeOn(Schedulers.computation())
    .switchMap(new Func1<Long, Observable<PersonObject>>() {
        @Override
        public Observable<PersonObject> call(Long aLong) {
            Log.d(TAG, "call: inserted into DB with ID " + aLong);
            Log.d(TAG, "call: Now fetching this item from the DB");
            return personObject.getPersonById(aLong).observeOn(Schedulers.computation());
        }
    }).subscribe(getSinglePersonFromDBSubscriber);

This not only saved me from having to create a Subject, but also saved me from worrying about trying to make Single's emit more than once, which as @koperko mentioned was incorrect. switchMap() was what I was looking for.

Upvotes: 0

paul
paul

Reputation: 13481

What about use relay. Relay is a subject except without the ability to call onComplete or onError

https://github.com/JakeWharton/RxRelay

Here you can see a practical example

https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java

Upvotes: 0

Related Questions