kj007
kj007

Reputation: 6254

Spring webflux "Only one connection receive subscriber allowed" if return server response from switchIfEmpty

I would like to put a case where if object exist then send error if not then create new user.

here is my handler:

public Mono<ServerResponse> createUser(ServerRequest request) {
        Mono<UserBO> userBOMono = request.bodyToMono(UserBO.class);
        Mono<String> email = userBOMono.map(UserBO::getEmail);
        Mono<User> userMono = email.flatMap(userRepository::findByEmail);
        return userMono.flatMap(user -> {
            Mono<ErrorResponse> errorResponseMono = errorHanlder.handleEmailAlreadyExist();
            return ServerResponse.status(HttpStatus.CONFLICT)
                    .contentType(MediaType.APPLICATION_JSON)
                    .body(errorResponseMono, ErrorResponse.class);
        }).switchIfEmpty(Mono.defer(() -> {
            Mono<User> newUserMono = userBOMono.flatMap(userMapping::mapUserBOToUser);
            Mono<User> dbUserMono = newUserMono.flatMap(userRepository::save);
            return ServerResponse.status(HttpStatus.CREATED)
                    .contentType(MediaType.APPLICATION_JSON)
                    .body(dbUserMono, User.class);

        }));

if Mono is not empty then its return conflict that what I want if if empty then create new but its throwing below error:

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:276) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
    at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:127) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464) ~[netty-transport-4.1.27.Final.jar:4.1.27.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]

Update Note: its correct behavior as per method definition:

switchIfEmpty(Mono<? extends T> alternate)
Fallback to an alternative Mono if this mono is completed without data

Means when I am sending empty Mono in body its work fine:

return ServerResponse.status(HttpStatus.CREATED)
                    .contentType(MediaType.APPLICATION_JSON)
                    .body(Mono.empty(), User.class);

so what is solution to handle swtichIfEmpty case if I would like to send Mono object as return from it.

Upvotes: 1

Views: 10028

Answers (3)

Matheus Sant Ana
Matheus Sant Ana

Reputation: 690

I've got the same error by running my @SpringBootTest class.

The problem seems to be that response was being writed while methods had already been closed.

Solved by passing "Mono.empty()" instead of full response.

Code Before:

WebClient.create()
.get()
.uri(new URI(UPDATE_COMPANIES_URL))
.exchangeToMono(response -> {
    if (response.statusCode().equals(HttpStatus.OK)) {
        return response.bodyToMono(Boolean.class).thenReturn(Boolean.TRUE);

    } else {
        System.out.println("[sendSecureRequest] Error sending request: " + response.statusCode());
        return response.bodyToMono(Boolean.class).thenReturn(Boolean.FALSE);
    }
}).subscribe();

Code After:

WebClient.create()
.get()
.uri(new URI(UPDATE_COMPANIES_URL))
.exchangeToMono(response -> {
    if (response.statusCode().equals(HttpStatus.OK)) {
        // TODO handle success

    } else {
        System.out.println("[sendSecureRequest] Error sending request: " + response.statusCode());
    }
    return Mono.empty();

}).subscribe();

Upvotes: 0

Vikram Rawat
Vikram Rawat

Reputation: 1662

I assume you use a WebClient to invoke this API. The client should not subscribe more than once, otherwise this error can come.

Upvotes: 0

kj007
kj007

Reputation: 6254

Finally I was able to resolve it, I was reading userBOMono stream twice which was causing this error to throw by webflux.

so here is updated code which works fine.

 public Mono<ServerResponse> createUser(ServerRequest request) {
        Mono<UserBO> userBOMono = request.bodyToMono(UserBO.class);
        return userBOMono.flatMap(userBO -> {
            String email = userBO.getEmail();
            Mono<User> userMono = userRepository.findByEmail(email);
            return userMono.flatMap(user -> errorHandler.handleEmailAlreadyExist())
                    .switchIfEmpty(Mono.defer(() -> createNewUser(userBO)));
        });
    }

    private Mono<ServerResponse> createNewUser(UserBO userBO) {
        Mono<User> userMono = Mono.just(userBO).flatMap(userMapping::mapUserBOToUser).flatMap(userRepository::save);
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
                .body(userMono, User.class);

    }

Upvotes: 2

Related Questions