user212942
user212942

Reputation: 317

What is the difference between Observable.create and fromPublisher?

Changing Observable.create to Observable.fromPublisher in the source below does not work. (If sample is not present, all are subscribed, but if sample is present, nothing is subscribed.)

What is the difference between Observable.create and fromPublisher?

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class SampleMain {
    public static void main(String[] args) {
        Observable<String> o = Observable.create(s -> {
            new Thread(() -> {
                for (int i=0; i<100; i++) {
                    s.onNext("Hello Observable.fromPublisher() A" + i);
                    s.onNext("Hello Observable.fromPublisher() B" + i);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                s.onComplete();
            }).start();
        });
        o
                .sample(1, TimeUnit.MILLISECONDS)
                .subscribe(System.out::println);
    }
}

Upvotes: 0

Views: 595

Answers (1)

akarnokd
akarnokd

Reputation: 70007

fromPublisher requires a properly implemented org.reactivestreams.Publisher that honors the rules. These Publishers typically come from 3rd party libraries or APIs.

create has built in infrastructure to convert a simpler emitter-style API to an Observable so that the developer doesn't have to worry that much of the underlying protocol.

Also may I direct your attention to the javadoc of fromPublisher:

The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.

If possible, use create(ObservableOnSubscribe) to create a source-like Observable instead.

Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda.

Upvotes: 1

Related Questions