paul
paul

Reputation: 13471

Run Flow in main thread

Again I´m comparing RxJava with Java 9 Flow. I see that Flow by default is asynchronously, and I was wondering if there´s a way to make it run synchronously.

Sometimes we just want to use it not for Nio but for sugar syntax, and have a more homogenous code.

In RxJava by default it´s synchronously, and you can make it run asynchronously using observerOn and subscribeOn in your pipeline.

There´s any operator in Flow to make it run in the main thread?.

Regards.

Upvotes: 5

Views: 458

Answers (3)

akarnokd
akarnokd

Reputation: 69997

It depends on what you mean by running on the main thread.

If you want to force an arbitrary Flow to execute on a specific thread, there is no standard way for doing it unless the Flow is implemented in a library that let's you override the asynchrony-providing parts. In RxJava terms, these are the Schedulers provided by the Schedulers utility class.

If you want to observe a Flow on a main thread, you have to write a blocking queue consumer on top of the Flow.Subscriber that blocks the thread until the queue has items. This can get complicated so I'll refer you to the blockingSubscribe implementation in Reactive4JavaFlow.

If you want to use the Java main thread as an Executor/Scheduler, that's even more complicated and needs similar blocking mechanism as well as some ideas from a threadpool executor. Reactive4JavaFlow happens to have such a Scheduler, which you can use as an Executor via: new SubmissionPublisher<>(128, blockingScheduler::schedule).

Upvotes: 2

M A
M A

Reputation: 72864

There is no operator to do so but the API allows you to control the way items are published. You can therefore just call the subscriber methods directly from the current thread.

class SynchronousPublisher implements Publisher<Data>  {
      public synchronized void subscribe(Subscriber<? super Data> subscriber) {
           subscriber.onSubscribe(new SynchronousSubscription(subscriber));
      }
 }
 static class SynchronousSubscription implements Subscription {
      private final Subscriber<? super Data> subscriber;

       SynchronousSubscription(Subscriber<? super Data> subscriber) {
          this.subscriber = subscriber;
       }
       public synchronized void request(long n) {
           ... // prepare item            
           subscriber.onNext(someItem);      
       }

       ...
   }
}

Upvotes: 3

Naman
Naman

Reputation: 31918

You can define your custom Publisher as documented in Flow for using a synchronous execution.

A very simple publisher that only issues (when requested) a single TRUE item to a single subscriber. Because the subscriber receives only a single item, this class does not use buffering and ordering control.

class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }

Upvotes: 4

Related Questions