Matelutex
Matelutex

Reputation: 2210

Webflux repeatWhenEmpty or retryWhen

I use SpringBoot and reactive programming with Webflux. I want to repeat some calling of my endpoint till data will be available (something will be returned).

I want to call commandControllerApi.findById until displayCommand will be returned with status == SUCCESS. How to tell Webflux that this part of my chain should be called for example 5 times because the data in my database should apper after 5-10 seconds...

I think the current code causes calling the whole chain again and not only the proper part of my chain (.flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId())))

My code:

public Mono<Boolean> validateCredentials(FlowConfCredentials flowCredentials, UUID agentId) {
    return securityService
        .getUser()
        .flatMap(
            user -> {
              Command command = new Command ();
              command.setAgentId(agentId.toString());
              command.setCommandType(COMMAND_TYPE);
              command.setArguments(createArguments());
              command.setCreatedBy(
                  user.getEmail());
              return commandControllerApi.saveCommand(command);
            })
        //       .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
      .flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId()))
        .filter(displayCommand -> displayCommand.getStatus().equals(OaCommandStatus.SUCCESS))
        .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
//        .repeatWhenEmpty(
//            Repeat.onlyIf(repeatContext -> true)
//                .exponentialBackoff(Duration.ofSeconds(5), Duration.ofSeconds(5))
//                .timeout(Duration.ofSeconds(30)))
        .filter(
            commandResponse ->
                commandResponse.getStatus() != null
                    && commandResponse.getStatus().equals(CommandStatus.SUCCESS))
        .map(commandResponse -> true)
        .switchIfEmpty(Mono.just(false));
  }

And below is the method that is calling the metohd above:

 public Flux<CredConfiguration> saveCredentials(
      Mono<FlowConfCredentials> flowCredentials, UUID agentId) {
    return flowCredentials
        .filterWhen(cred -> validationService.validateCredentials(cred, agentId))
        .flatMapMany(
            flowConfCredentials -> {
              if (condition1()) {
                return caveCredentials(flowConfCredentials);
              }
              if (condition2()) {
                return saveCredentialsForUser(flowConfCredentials.getExistingCredentials());
              }
              return Flux.error(new EmptyConfigurationException(CREDENTIALS_MESSAGE));
            });
  }

Upvotes: 3

Views: 4328

Answers (1)

Phil Clay
Phil Clay

Reputation: 4534

To only repeat the subscription to the mono returned by findById without resubscribing to the upstream saveCommand/getUser, move the filter/repeatWhenEmpty inside the flatMap that calls findById.

public Mono<Boolean> validateCredentials(FlowConfCredentials flowCredentials, UUID agentId) {
    return securityService
        .getUser()
        .flatMap(
            user -> {
              Command command = new Command();
              command.setAgentId(agentId.toString());
              command.setCommandType(COMMAND_TYPE);
              command.setArguments(createArguments());
              command.setCreatedBy(
                  user.getEmail());
              return commandControllerApi.saveCommand(command);
            })
        .flatMap(saveResponse -> commandControllerApi.findById(saveResponse.getCommandId())
           .filter(findResponse -> findResponse.getStatus().equals(OaCommandStatus.SUCCESS))
           .repeatWhenEmpty(
                Repeat.onlyIf(repeatContext -> true)
                    .exponentialBackoff(Duration.ofSeconds(5), Duration.ofSeconds(5))
                    .timeout(Duration.ofSeconds(30))))
        .hasElement();
}

Upvotes: 4

Related Questions