Rakshith R Pai
Rakshith R Pai

Reputation: 192

How to change execution thread in Reactor

@RequestMapping(value    = "/try",
      method   = RequestMethod.GET)
      @ResponseBody
public String demo(){
List<String>data=new ArrayList<>();
data.add("A1");
data.add("A2");
data.add("A3");
data.add("A4");

Flux.fromIterable(data).subscribe(s->printStatement(s));
return "done";
}

public  void printStatement(String s){
long i;
for(i=0;i<1000000000;i++)
{}
LOGGER.info(s+"------"+Thread.currentThread().getId());
}

Here in the above example i was hoping that the tread id would be different(hopping asynchronouslyexecution).From log i could see that same tread is executing the entire process

Log:

2018-05-02 03:24:42.387  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A1------26
2018-05-02 03:24:44.118  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A2------26
2018-05-02 03:24:44.418  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A3------26
2018-05-02 03:24:44.717  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A4------26

How i do i make sure its executing asynchronously.

Upvotes: 0

Views: 1035

Answers (2)

jwpol
jwpol

Reputation: 1475

Remember that Project Reactor is concurrency-agnostic, that is, it does not impose on you any threading model. Moreover, the handling of the events happens on the thread which performed the subscription. You use WebFlux, so the subscription happens on one of the threads from the nio pool. In Unix based systems you would see epoll thread. Nevertheless, only one thread is involved because you don't use any Schedulers nor do you use flatMap which by definition is asynchronous.

Very good reading: https://projectreactor.io/docs/core/release/reference/#schedulers

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

The execution model of Reactor is that most operators don't change the thread for you (except when time is involved). The library offers two operators that allow switching to threads, publishOn (the most common) and subscribeOn.

For example Flux.fromIterable(data).publishOn(Schedulers.newSingle("example")).subscribe(...) would be the way to go here.

Note that WebFlux's model is that it starts the processing of the chain in the Netty threads, these nio threads that you see. It is thus very important that you don't block these threads (that would prevent processing of further incoming requests entirely).

Schedulers offer factory methods for various Scheduler flavors, which is a Reactor abstraction (more or less on top of ExecutorService).

Upvotes: 1

Related Questions