chhil
chhil

Reputation: 460

RxJava2 : 2 separate observerable output and and merge output of same observables differ

Snippet1 , I can see the sysout from both subscribers.
Snippet2 , I dont see output from the second observable.
Why is the merge not working for me?

Snippet1

x = createQ2Flowable().subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io())
        .filter(predicate -> !predicate.toString().contains("<log realm=\"\""))
        .subscribe(onNext -> System.out.println("Q2->" + onNext));

y = createMetricsFlowable().subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io())
        .subscribe(onNext -> System.out.println("metrics->" + onNext));  

Snippet2

createQ2Flowable().mergeWith(createMetricsFlowable())
.subscribeOn(Schedulers.computation())
.subscribe(onNext -> System.out.println(onNext));  

[edit]: Added flowable creators

private Flowable<String> createMetricsFlowable() {
    return Flowable.create(source -> {
        Space sp = SpaceFactory.getSpace("rxObservableFeeder");
        while (running()) {
            String line = (String) sp.in("RXTmFeeder");
            source.onNext(line);
        }

    }, BackpressureStrategy.BUFFER);

}

private Flowable<String> createQ2Flowable() {
    return Flowable.create(source -> {
        Space sp = SpaceFactory.getSpace("LoggerSpace");
        while (running()) {
            LogEvent line = (LogEvent) sp.in("rxLoggingKey");
            source.onNext(line.toString());

        }

    }, BackpressureStrategy.BUFFER);

}

Upvotes: 0

Views: 90

Answers (1)

akarnokd
akarnokd

Reputation: 70017

From the comments:

try

createQ2Flowable()
.subscribeOn(Schedulers.computation())      // <-------------------------
.mer‌​geWith(createMetrics‌​Flowable()
    .subscribe‌​On(Schedulers.comput‌​ation())  // <-------------------------
)

Now I need to know why it happened

Given the detailed implementation, you have two synchronous Flowables. When you merge them, the first Flowable is subscribed to and starts emitting immediately and never giving back the control to mergeWith, therefore the second Flowable is never subscribed to.

The subscribeOn after mergeWith is not equivalent to the solution provided above. You have to explicitly have both Flowables subscribed on a background thread so mergeWith can subscribe to the second Flowable after now that the synchronous looping has been moved off from the thread the mergeWith uses for subscribing to its sources.

Upvotes: 1

Related Questions