Reputation: 317
Changing Observable.create
to Observable.fromPublisher
in the source below does not work. (If sample is not present, all are subscribed, but if sample is present, nothing is subscribed.)
What is the difference between Observable.create and fromPublisher?
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class SampleMain {
public static void main(String[] args) {
Observable<String> o = Observable.create(s -> {
new Thread(() -> {
for (int i=0; i<100; i++) {
s.onNext("Hello Observable.fromPublisher() A" + i);
s.onNext("Hello Observable.fromPublisher() B" + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
s.onComplete();
}).start();
});
o
.sample(1, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
}
}
Upvotes: 0
Views: 595
Reputation: 70007
fromPublisher
requires a properly implemented org.reactivestreams.Publisher
that honors the rules. These Publisher
s typically come from 3rd party libraries or APIs.
create
has built in infrastructure to convert a simpler emitter-style API to an Observable
so that the developer doesn't have to worry that much of the underlying protocol.
Also may I direct your attention to the javadoc of fromPublisher
:
The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.
If possible, use create(ObservableOnSubscribe) to create a source-like Observable instead.
Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda.
Upvotes: 1