Stimsoni
Stimsoni

Reputation: 3156

RXjava continuous streams of data

What is the right way in RxJava to Subscribe to an observable that continuously receives events from different sources at unknown times.

For example: Say we have tasks that are received from a server and the calls to the server could be started from a number of different areas (eg triggered by a push notification, a polling event, user interaction etc).

The only thing we care on the UI is that we are notified of the tasks that have been received. We don't care where they come from. I effectively want to start observing for the life of the Activity and the model updates the observer as required

I have implemented the below class that does what I want, but I'm not sure if it the right way or if RxJava already accounts for something like this.

This class effective creates on ConnectableObservable that can have many observers subscribe to the one Observable(ensure all observers get the same stream). One thing I have noticed is calling observeOn and subscribeOn can cause unexpected results when subscribing to a ConnectableObservable in this fashion which can be a problem as the class can't control who is using the ConnectableObservable.

public class ApiService {

private Emitter<String> myEmitter;
private ConnectableObservable myObservable;

public ApiService() {
    //Create an observable that is simply used to get the emitter.
    myObservable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            myEmitter = e;
        }
    }).publish();
    //connect must be called here to ensure we have an instance of the emitter
    // before we have any subscribers
    myObservable.connect();

}

/**
 * This method returns the observable that all observers will subscribe to
 *
 * @return
 */
public Observable<String> getObservable() {
    return myObservable;
}

/**
 * This method is used to simulate a value that has been received from
 * an unknown source
 *
 * @param value
 */
public void run(final String value) {
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            myEmitter.onNext(value);
            //api call
        }
    }).subscribe();
}

}

I'm also curious if there are any concerns with memory leaks doing it this way assuming each observer subscribing to the one Observer is disposed of at the appropriate time.

This is the refactored class after seeing Bob's answer

public class ApiService {

private PublishProcessor<String> myProcessor = PublishProcessor.create();

public void subscribe(Subscriber<String> subscriber) {
    myProcessor.subscribe(subscriber);
}

/**
 * This method is used to simulate a value that has been received from
 * an unknown source
 *
 * @param value
 */
public void run(final String value) {
    myProcessor.onNext(value);
}

}

Upvotes: 1

Views: 2104

Answers (1)

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

Use a Subject (RxJava) or Processor (RxJava 2) to do the subscription. You would then subscribe the subject to each source observable. Eventually, you would subscribe to the subject and get the combined stream of emissions.

Alternatively, you could use a Relay to isolate the downstream observers from any onComplete() or onError() that might come from upstream. This is a better choice when any of the observables might complete before the others.

Upvotes: 2

Related Questions