Manshavi Kumar
Manshavi Kumar

Reputation: 133

Main thread is not waiting for subscribers to finish their task in reactive subscriber

I have a service in spring which needs to fetch data using ten different methods.

I would like to have these methods execute parallelly to do some DB operations and return to the parent thread. But the parent thread should wait until all the responses come and then return a response.

In my current approach, I am using reactive mono to execute all methods asynchronously but the main thread is not waiting for the subscriber methods to finish.

Below are my two methods which I have subscribed

private Mono<BaseResponse> getProfileDetails(long profileId){
        return new Mono<BaseResponse>() {

            @Override
            public void subscribe(Subscriber<? super BaseResponse> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getProfileDetails");
                s.onNext(new BaseResponse());
            }
        };
    }

private Mono<Address> getAddressDetails(long profileId){
        return new Mono<Address>() {

            @Override
            public void subscribe(Subscriber<? super Address> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getAddressDetails");
                s.onNext(new Address());
            }
        };
    }

And below is my main method

public BaseResponse getDetails(long profileId){
        ExecutorService executors = Executors.newFixedThreadPool(2);

        Mono<BaseResponse> profileDetail = this.getProfileDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
        Mono<BaseResponse> addressDetail = this.getAddressDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));

        List<BaseResponse> list = new ArrayList<>();

        profileDetail.mergeWith(addressDetail)
        .subscribe(consumer -> {
                list.add(consumer);
        });

        System.out.println("list: "+new Gson().toJson(list));
        executors.shutdown();

        return response;
    }

Below is my output:

list: []
Inside getProfileDetails
Inside getAddressDetails

My output shows that the main thread is not waiting for the subscriber to finish its task, so how can I handle this situation?

Upvotes: 3

Views: 2597

Answers (1)

Michael Berry
Michael Berry

Reputation: 72254

I'm assuming your getProfileDetails() and getAddressDetails() methods are just placeholders, as they don't make much sense as written.

That being said, if this is your entire application here, and you genuinely just want to block before completing, you may as well just change your current subscribe() call for a doOnNext(), then just blockLast():

profileDetail.mergeWith(addressDetail)
.doOnNext(consumer -> {
        list.add(consumer);
})
.blockLast();

Blocking on reactive threads is usually ill-advised in reactive applications for good reason, but in this case you literally just want to block before exiting outright - so I can't see much downside here.

Upvotes: 6

Related Questions