Prashant Pandey
Prashant Pandey

Reputation: 4642

What Is Meant By 'Subscriber Thread' in RxJava

I was reading a book on RxJava, and here's an excerpt from it:

Observable.create(s -> {
   ... async subscription and data emission ...
})
.doOnNext(i -> System.out.println(Thread.currentThread()))
.filter(i -> i % 2 == 0)
.map(i -> "Value " + i + " processed on " + Thread.currentThread())
.subscribe(s -> System.out.println("SOME VALUE =>" + s));
System.out.println("Will print BEFORE values are emitted")”

He writes that since the operation is async, values will be emitted on a thread different from the subscriber's. Since the subscriber's thread is non-blocking, Will print BEFORE values are emitted will be printed before any value is pushed to the subscriber.

I cannot get my head around what this subscriber thread is? Can I get its name somehow. To test what the author is saying, I wrote some code:

Observable.create(
            subscriber -> {
              System.out.println("Entering thread: " + Thread.currentThread().getName());
              Thread t =
                  new Thread(
                      new Runnable() {
                        @Override
                        public void run() {
                          try {
                            System.out.println(
                                "Emitting threads: " + Thread.currentThread().getName());
                            Thread.sleep(5000);
                            subscriber.onNext(1);
                            subscriber.onNext(2);
                            subscriber.onNext(3);
                            subscriber.onCompleted();
                          } catch (InterruptedException e) {
                            subscriber.onError(e);
                          }
                        }
                      });
              t.start();
            })
        .subscribe(
            a -> {
              System.out.print("Subscriber thread: " + Thread.currentThread().getName() + " ");
              System.out.println(a);
            });
    System.out.println("Main thread exiting:");

When I run the above code, I find that the subscriber thread is the same as that thread on which onNext() is getting called. Since this is an asynchronous operation, the items should have been emitted on a thread different from that of the subscriber's. How come both the threads are the same?

Further, even after the main thread exits, the program keep running and is terminated only after all the elements are pushed down via onNext(). Why is this happening? Why isn't the program exiting?

Upvotes: 1

Views: 208

Answers (2)

akarnokd
akarnokd

Reputation: 69997

Subscriber thread is the thread that is calling subscribe. Since an RxJava flow is a chain of such subscribe calls, these calls can be moved to other threads within the sequence via subscribeOn.

For example:

Observable.fromCallable(() -> Thread.currentThread())
.subscribe(System.out::println);

System.out.println("Subscriber thread: " + Thread.currentThread());

This will print the subscriber thread twice. Alternatively

Observable.fromCallable(() -> Thread.currentThread())
.subscribeOn(Schedulers.single())
.subscribe(System.out::println);

System.out.println("Subscriber thread: " + Thread.currentThread());

Thread.sleep(1000);

This will print the subcriber thread and the RxSingleScheduler thread.

Upvotes: 1

Sai prateek
Sai prateek

Reputation: 11896

By default, Rx is single-threaded which implies that an Observable and the chain of operators that we can apply to it will notify its observers on the same thread on which its subscribe() method is called.

Upvotes: 0

Related Questions