vivi
vivi

Reputation: 355

Wrapping blocking client with Mono and execute sequence

I am trying to make a reactive application which needs to execute ssh commands.

Currently, there is an SSH client (based on sshd mina) which is blocking (maybe there is a way to use it in a non-blocking way, but I don't know it). My idea is to create a wrapper around this blocking client, so that I can transform the blocking calls into Mono as in the code below.

public class SshReactiveClient extends AutoCloseable {
  private final SshClient sshClient;

  public SshReactiveClient(final SshClient sshClient) {
    this.sshClient = sshClient;
  }

  public Mono<SshResponse> open(final Duration timeout) {
    return Mono.fromCallable(() -> sshClient.open(timeout))
        .subscribeOn(Schedulers.boundedElastic());
  }

  public Mono<SshResponse> execCommand(final SshCommand command, final Duration timeout) {
    return Mono.fromCallable(() -> sshClient.execCommand(command, timeout))
        .subscribeOn(Schedulers.boundedElastic());
  }

  @Override
  public void close() throws Exception {
    sshClient.close();
  }
}

First, is it a good idea to proceed like that or not? What would be better?

The second point is how to write the code so that I can execute a sequence of ssh command using the responses from the previous commands to execute the next one?

Upvotes: 1

Views: 2069

Answers (1)

Alex
Alex

Reputation: 5982

Your understanding is correct. You need to wrap blocking or sync code and run it on a separate Scheduler. The better way would be if client supports async interface.

To execute commands in a sequence you need to build a flow using reactive API.

execCommand(command1)
        .flatMap(res -> 
             execCommand(getCommand2(res))
        )
        .flatMap(res -> 
             execCommand(getCommand3(res))
        )

There are many other options depending on your requirements. For example, if you need results from command1 & command2 to execute command3, you could just "shift" flow one level down

execCommand(command1)
        .flatMap(res1 -> 
             execCommand(getCommand2(res1))
                  .flatMap(res2 -> 
                        execCommand(getCommand3(res1, res2))
                  )
        )
        

As an alternative, you could apply builder pattern to the reactor flow to collect responses in a sequential flow Builder pattern with Reactor mono

You could also execute command1 and command2 in parallel and use responses from both

Mono.zip(command1, command2)
     .flatMap(tuple -> 
           execCommand(getCommand3(tuple.getT1(), tuple.getT2()))
     )

Upvotes: 2

Related Questions