Lovegiver
Lovegiver

Reputation: 441

How to subscribe multiple Observers on a single Observable with RxJava?

I have an Observable<List<Event>> and I want this Observable to be shared by multiple Subscribers. Each Subscriber will filter each Event and process it.

The Observable<List<Event>> has been created this way :

    @Override
    public List<Event> findNewEvents() {
        List<Event> results = new ArrayList<>();
        while(! fetchedEvents.isEmpty()) {
            results.add(fetchedEvents.poll());
        }
        return results;
    }

    @Override
    public Observable<List<Event>> findNewObservableEvents() {
        return Observable.just(findNewEvents());
    }

Here is the code :

            Observable<List<Event>> newEvents = reader.findNewObservableEvents();

            Disposable riskApproveRiskEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(risk::isForRiskApproval)
                    .subscribe(risk::approveRisk);

            Disposable fundingCheckFundabilityEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(funding::isForFundingFundabilityCheck)
                    .subscribe(funding::checkFundability);

            Disposable fundingFundEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
                    .flatMap(Observable::just)
                    .filter(funding::isForFundingFund)
                    .subscribe(funding::fund);

I have tried :

newEvents.share() and also newEvents.publish().

When trying : newEvents.create() I need to supply a ObservableOnSubscribe object, but I don't understand how to obtain it.

What is the trick ?

Upvotes: 0

Views: 903

Answers (1)

akarnokd
akarnokd

Reputation: 69997

If you don't want to consume findNewObservableEvents multiple times then use publish and once the subscribers have subscribed, call connect on the ConnectableObservable:

ConnectableObservable<List<Event>> newEvents = reader.findNewObservableEvents().publish();

Disposable riskApproveRiskEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(risk::isForRiskApproval)
                .subscribe(risk::approveRisk);

 Disposable fundingCheckFundabilityEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(funding::isForFundingFundabilityCheck)
                .subscribe(funding::checkFundability);

Disposable fundingFundEventsDisposable = 
         newEvents.flatMapIterable(riskEvents -> riskEvents)
                .flatMap(Observable::just)
                .filter(funding::isForFundingFund)
                .subscribe(funding::fund);

Disposable connection = newEvents.connect();

Upvotes: 3

Related Questions