user1189332
user1189332

Reputation: 1941

Spring Reactor Thread Model

Newbie alert to Spring Webflux (v 2.0.1.RELEASE).

I'd like to use Spring Webflux for a back end (Webless) application for processing a large amount of data from a JMS listener.

My understanding is Spring Webflux provides an ]non-blocking/async concurrency model. However, I got a basic question for which I need some help. As a disclaimer, this whole concept of reactive programming is very new to me and I'm still in the process of this paradigm shift.

Consider this code:

Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

I understand from the docs that nothing happens to the event processing chain until a "subscribe" function is called upon.

But internally, does the spring use (if it wishes) use separate threads asynchronously for every function inside the "map" function? If spring uses a "single" thread for these chains, then what's the real purpose here? Isn't it a blocking and single threaded model based on a different syntax?

I observe that the code always behaves sequentially and with the same thread. What's the threading model of spring webflux?

Upvotes: 6

Views: 10463

Answers (2)

a better oliver
a better oliver

Reputation: 26828

Reactive programming is a programming paradigm and as such it doesn't make any assumptions about the technical implementation.

The reactive manifesto describes reactive systems and brings asynchronous communication and backpressure on the table. Other than that it also makes no assumptions about technical details.

Spring Reactor, the foundation of Webflux, is a library that allows you to easily build reactive systems and follow the reactive programming paradigm.

The thread that is used by a stream depends on the publisher. The default is to use the current thread. Without any intervention, the stream cannot be asynchronous if a publisher is synchronous. And the stream is blocking if a publisher blocks. But take the following example:

Flux.interval(Duration.ofMillis(100))
    .take(2)
    .subscribe(i -> System.out.println(Thread.currentThread().getName()));

Flux.interval publishes on another thread and so the chain runs asynchronously in another thread.

Let's look at another example:

Scheduler scheduler = Schedulers.newElastic("foo");

Flux<Integer> flux = Flux.just(1, 2)
    .subscribeOn(scheduler);

flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));
flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));

You will notice that each subscriber runs on its own thread (from the same thread pool though). The publishOn operator is similar.

If you subscribe to a publisher you can use the same programming paradigm regardless of whether it's synchronous or asynchronous. And you can always introduce asynchronous behavior by adding a subscribeOn or publishOn operator.

Upvotes: 6

MuratOzkan
MuratOzkan

Reputation: 2730

TL; DR:

  1. It's a Project Reactor thing, Spring-Webflux doesn't decide what operation runs on which thread.
  2. Project Reactor makes it easier to tell where you're crossing the thread boundaries. Also, there's no (explicit) synchronization going on; making it harder to introduce concurrency problems.

No, it isn't a single threaded model with a different syntax. Project Reactor tries as much as possible to use your main thread to avoid context-switches. In addition, it provides special operators which lets you specify the threads that previous operations run on.

For instance, this modified example would run on different threads; as subscribeOn operator defines which thread pool the whole chain runs on:

Mono.just("ONE")
    .map(item -> func(" A " + item))
    .map(item -> func(" B " + item))
    .map(item -> func(" C " + item))
    .subscribeOn(Schedulers.elastic())
    .subscribe(item -> {
        System.out.println(Thread.currentThread().getName() + " " + item);
    });

Mono.just("TWO")
    .map(item -> func(" A " + item))
    .map(item -> func(" B " + item))
    .map(item -> func(" C " + item))
    .subscribeOn(Schedulers.elastic())
    .subscribe(item -> {
        System.out.println(Thread.currentThread().getName() + " " + item);
    });

In this case, both operations execute on an elastic-x thread; not blocking the main thread. The order of the operations might vary with each run.

Upvotes: 2

Related Questions