Hohenheim
Hohenheim

Reputation: 407

RxJava2 Publish

What is the difference between

ObservableTransformer {
    Observable.merge(
        it.ofType(x).compose(transformerherex),
        it.ofType(y).compose(transformerherey)
    )
}

and

ObservableTransformer {
    it.publish{ shared ->
        Observable.merge(
            shared.ofType(x).compose(transformerherex),
            shared.ofType(y).compose(transformerherey)
        )
    }
}

when I run my code using this two, I got the same results. What does publish do here.

Upvotes: 9

Views: 2493

Answers (2)

akarnokd
akarnokd

Reputation: 70017

The difference is that the top transformer will subscribe to the upstream twice for a single subscription from the downstream, duplicating any side effects of the upstream which is usually not wanted:

Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
      .doOnSubscribe(s -> System.out.println("Subscribed!"));


mixedSource.compose(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

will print

Subscribed!
2
3
4
Subscribed!
A
B
C

The side-effect represented here is the printout Subscribed! Depending on the actual work in a real source, that could mean sending an email twice, retrieving the rows of a table twice. With this particular example, you can see that even if the source values are interleaved in their type, the output contains them separately.

In contrast, publish(Function) will establish one subscription to the source per one end subscriber, thus any side-effects at the source only happen once.

mixedSource.publish(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

which prints

Subscribed!
A
2
B
3
C
4

because the source is subscribed once and each item is multicast to the two "arms" of the .ofType().compose().

Upvotes: 26

Sachin Chandil
Sachin Chandil

Reputation: 17829

publish operator converts your Observable to Connectable Observable.

Lets see what does Connectable Observable mean: Suppose you want to subscribe an observable multiple time and want to serve same items to each subscriber. You need to use Connectable Observable.

Example:

var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Connect();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));

output:

first subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2

In this case, we are quick enough to subscribe before the first item is published, but only on the first subscription. The second subscription subscribes late and misses the first publication.

We could move the invocation of the Connect() method until after all subscriptions have been made. That way, even with the call to Thread.Sleep we will not really subscribe to the underlying until after both subscriptions are made. This would be done as follows:

var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
observable.Connect();

output:

first subscription : 0 
second subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2 

So using Completable Observable, we have a way to control when to let Observable emit items.

Example taken from : http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect

EDIT According to 180th slide in this link:

Another nature of publish is that if any observer start observing after 10 seconds of observable started emitting items, observer gets only items those were emitted after 10 seconds(at the time of subscription) not all the items. So in sides, as i could understood that publish is being used for UI events. And it totally makes sense that any observer should only receive those events that has been performed after it has subscribed NOT all the events happened before.

Hope it helps.

Upvotes: 2

Related Questions