Reputation: 35
Firstly, I am confused if there is any benefit using reactive handler method parameters. Secondly, I am experiencing some issue using this technique when I need to read these parameters multiple times.
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
var userDto = user.block();
return userRepository.findByEmail(userDto.getEmail())
.filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
.map(foundUser -> JWT.create()
.withSubject(foundUser.getEmail())
.withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
.withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
.sign(Algorithm.HMAC512(jwtSecret))
)
.map(TokenDto::new);
}
As you can see, I need to block to actually read parameter twice. I tried to zipWith the userRepository result but then I end up with Mono in the Tuple2 structure.
Does Reactor have any solution to this? Maybe there is a function publish/repeat which might by promissing. I have managed to build a pipe but there has been always error in reading request body multiple times.
Thank you.
Error:
"timestamp": "2020-07-03T06:24:23.592+00:00",
"path": "/login",
"status": 400,
"error": "Bad Request",
"message": "Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)",
"requestId": "defc8953-1",
"trace": "org.springframework.web.server.ServerWebInputException: 400 BAD_REQUEST \"Request body is missing: public reactor.core.publisher.Mono<org.springframework.http.ResponseEntity<com.ostrozlik.taskagent.web.dto.TokenDto>> com.ostrozlik.taskagent.web.controller.UserController.login(reactor.core.publisher.Mono<com.ostrozlik.taskagent.web.dto.UserDto>)\"\n\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nError has been observed at the following site(s):\n\t|_ checkpoint ⇢ Handler com.ostrozlik.taskagent.web.controller.UserController#login(Mono) [DispatcherHandler]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]\n\t|_ checkpoint ⇢ HTTP POST \"/login\" [ExceptionHandlingWebHandler]\nStack trace:\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleMissingBody(AbstractMessageReaderArgumentResolver.java:230)\n\t\tat org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.lambda$readBody$5(AbstractMessageReaderArgumentResolver.java:194)\n\t\tat reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4219)\n\t\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)\n\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:174)\n\t\tat reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:115)\n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onComplete(FluxMapFuseable.java:336)\n\t\tat reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:384)\n\t\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1783)\n\t\tat reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)\n\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)\n\t\tat reactor.core.publisher.Operators.complete(Operators.java:135)\n\t\tat reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:294)\n\t\tat reactor.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:138)\n\t\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)\n\t\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\tat java.base/java.lang.Thread.run(Thread.java:832)\n"
}
Upvotes: 1
Views: 3242
Reputation: 998
while @123's answer is perfect
there is another "pattern" we can try to avoid nesting
we create a "context object", that "holds a reference to all interactions", so it can be easily used to retrieve them at any point
it might not be beneficial in this simple example, but probably it would be when trying more number of operations
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
Context ctx = new Context();
return user
.doOnNext(ctx::setUserDto) // set ctx
.flatMap((_v) -> userRepository.findByEmail(ctx.getUserDto().getEmail())) // use ctx
.doOnNext(ctx::setUserEntity) // set ctx
.filter((_v) -> bCryptPasswordEncoder.matches(
ctx.getUserDto().getPassword(),
ctx.getUserEntity().getPassword())
) // use ctx
.map((_v) -> JWT.create()
.withSubject(ctx.getUserEntity().getEmail())
.withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
.withClaim(USER_ROLES_CLAIM, ctx.getUserEntity().getRoles()...)
.sign(Algorithm.HMAC512(jwtSecret))
)
.map(TokenDto::new);
}
@Data
@NoArgsConstructor
class Context {
private UserEntity userEntity;
private UserDto userDto;
}
Upvotes: 0
Reputation: 11216
Could do it this way, which won't subscribe to user twice.
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
return user
.flatMap(userDto ->
userRepository
.findByEmail(userDto.getEmail())
//filter is inside flatmap
.filter(foundUser -> bCryptPasswordEncoder.matches(userDto.getPassword(), foundUser.getPassword()))
)
.map(foundUser -> JWT.create()
.withSubject(foundUser.getEmail())
.withExpiresAt(new Date(System.currentTimeMillis() + jwtTtl.toMillis()))
.withClaim(USER_ROLES_CLAIM, foundUser.getRoles().stream().map(RoleEnum::name).collect(Collectors.toList()))
.sign(Algorithm.HMAC512(jwtSecret))
)
.map(TokenDto::new);
}
Upvotes: 2
Reputation: 4642
This is what I could come up with using zip
. Why do you think it's not right to have a Mono
in the tuple2
structure?
public Mono<TokenDto> generateToken(Mono<UserDto> user) {
return Mono.zip(user,user.flatMap(a -> userRepository.findByEmail(a.getEmail())))
.filter(tuple2 -> bCryptPasswordEncoder.matches(tuple2.getT1().getPassword(), tuple2.getT2().getPassword()))
.map(tuple2 -> tuple2.getT2())
.map(foundUser -> {//your mapping logic})
.map(TokenDto::new);
}
Upvotes: 0