John Snow
John Snow

Reputation: 35

Webflux - Read mono (request) multiple times

Firstly, I am confused if there is any benefit using reactive handler method parameters. Secondly, I am experiencing some issue using this technique when I need to read these parameters multiple times.

public Mono<TokenDto> generateToken(Mono<UserDto> user) {
        var userDto = user.block();
        return userRepository.findByEmail(userDto.getEmail())
                .filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
                .map(foundUser -> JWT.create()
                        .withSubject(foundUser.getEmail())
                        .withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
                        .withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
                        .sign(Algorithm.HMAC512(jwtSecret))
                )
                .map(TokenDto::new);
    }

As you can see, I need to block to actually read parameter twice. I tried to zipWith the userRepository result but then I end up with Mono in the Tuple2 structure.

Does Reactor have any solution to this? Maybe there is a function publish/repeat which might by promissing. I have managed to build a pipe but there has been always error in reading request body multiple times.

Thank you.

Error:

"timestamp": "2020-07-03T06:24:23.592+00:00",
    "path": "/login",
    "status": 400,
    "error": "Bad Request",
    "message": "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)",
    "requestId": "defc8953-1",
    "trace": "org.springframework.web.server.ServerWebInputException: 400 BAD_REQUEST \"Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)\"\n\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n\t|_ checkpoint ⇢ Handler com.ostrozlik.taskagent.web.controller.UserController#login(Mono) [DispatcherHandler]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ HTTP POST \"/login\" [ExceptionHandlingWebHandler]\nStack trace:\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.lambda$readBody$5(AbstractMessageReaderArgumentResolver.java:194)\n\t\tat reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4219)\n\t\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)\n\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:174)\n\t\tat reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:115)\n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:336)\n\t\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:384)\n\t\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1783)\n\t\tat reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.Operators.complete(Operators.java:135)\n\t\tat reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:294)\n\t\tat reactor.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:138)\n\t\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)\n\t\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\tat java.base/java.lang.Thread.run(Thread.java:832)\n"
}

Upvotes: 1

Views: 3242

Answers (3)

keemahs
keemahs

Reputation: 998

while @123's answer is perfect
there is another "pattern" we can try to avoid nesting
we create a "context object", that "holds a reference to all interactions", so it can be easily used to retrieve them at any point

it might not be beneficial in this simple example, but probably it would be when trying more number of operations

public Mono<TokenDto> generateToken(Mono<UserDto> user) {

  Context ctx = new Context();

  return user
    .doOnNext(ctx::setUserDto) // set ctx
    .flatMap((_v) -> userRepository.findByEmail(ctx.getUserDto().getEmail())) // use ctx
    .doOnNext(ctx::setUserEntity) // set ctx
    .filter((_v) -> bCryptPasswordEncoder.matches(
      ctx.getUserDto().getPassword(),
      ctx.getUserEntity().getPassword())
    ) // use ctx
    .map((_v) -> JWT.create()
      .withSubject(ctx.getUserEntity().getEmail())
      .withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
      .withClaim(USER_ROLES_CLAIM, ctx.getUserEntity().getRoles()...)
      .sign(Algorithm.HMAC512(jwtSecret))
    )
    .map(TokenDto::new);
}

@Data
@NoArgsConstructor
class Context {

  private UserEntity userEntity;

  private UserDto userDto;
}

Upvotes: 0

123
123

Reputation: 11216

Could do it this way, which won't subscribe to user twice.

public Mono<TokenDto> generateToken(Mono<UserDto> user) {
    return user
        .flatMap(userDto ->
            userRepository
                .findByEmail(userDto.getEmail())
                //filter is inside flatmap
                .filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
        )
        .map(foundUser -> JWT.create()
            .withSubject(foundUser.getEmail())
            .withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
            .withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
            .sign(Algorithm.HMAC512(jwtSecret))
        )
        .map(TokenDto::new);
}

Upvotes: 2

Prashant Pandey
Prashant Pandey

Reputation: 4642

This is what I could come up with using zip. Why do you think it's not right to have a Mono in the tuple2 structure?

public Mono<TokenDto> generateToken(Mono<UserDto> user) {

        return Mono.zip(user,user.flatMap(a -> userRepository.findByEmail(a.getEmail())))
                   .filter(tuple2 -> bCryptPasswordEncoder.matches(tuple2.getT1().getPassword(), tuple2.getT2().getPassword()))
                   .map(tuple2 -> tuple2.getT2())
                   .map(foundUser -> {//your mapping logic})
                   .map(TokenDto::new);

    }

Upvotes: 0

Related Questions