Reputation: 13471
Again I´m comparing RxJava with Java 9 Flow. I see that Flow by default is asynchronously, and I was wondering if there´s a way to make it run synchronously.
Sometimes we just want to use it not for Nio but for sugar syntax, and have a more homogenous code.
In RxJava by default it´s synchronously, and you can make it run asynchronously using observerOn
and subscribeOn
in your pipeline.
There´s any operator in Flow to make it run in the main thread?.
Regards.
Upvotes: 5
Views: 458
Reputation: 69997
It depends on what you mean by running on the main thread.
If you want to force an arbitrary Flow to execute on a specific thread, there is no standard way for doing it unless the Flow is implemented in a library that let's you override the asynchrony-providing parts. In RxJava terms, these are the Scheduler
s provided by the Schedulers
utility class.
If you want to observe a Flow on a main thread, you have to write a blocking queue consumer on top of the Flow.Subscriber
that blocks the thread until the queue has items. This can get complicated so I'll refer you to the blockingSubscribe
implementation in Reactive4JavaFlow.
If you want to use the Java main thread as an Executor
/Scheduler
, that's even more complicated and needs similar blocking mechanism as well as some ideas from a threadpool executor. Reactive4JavaFlow happens to have such a Scheduler, which you can use as an Executor via: new SubmissionPublisher<>(128, blockingScheduler::schedule)
.
Upvotes: 2
Reputation: 72864
There is no operator to do so but the API allows you to control the way items are published. You can therefore just call the subscriber methods directly from the current thread.
class SynchronousPublisher implements Publisher<Data> {
public synchronized void subscribe(Subscriber<? super Data> subscriber) {
subscriber.onSubscribe(new SynchronousSubscription(subscriber));
}
}
static class SynchronousSubscription implements Subscription {
private final Subscriber<? super Data> subscriber;
SynchronousSubscription(Subscriber<? super Data> subscriber) {
this.subscriber = subscriber;
}
public synchronized void request(long n) {
... // prepare item
subscriber.onNext(someItem);
}
...
}
}
Upvotes: 3
Reputation: 31918
You can define your custom Publisher
as documented in Flow
for using a synchronous execution.
A very simple publisher that only issues (when requested) a single TRUE item to a single subscriber. Because the subscriber receives only a single item, this class does not use buffering and ordering control.
class OneShotPublisher implements Publisher<Boolean> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed)
subscriber.onError(new IllegalStateException()); // only one allowed
else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) future.cancel(false);
}
}
}
Upvotes: 4