user9309329
user9309329

Reputation: 323

Print output from multiple SSH commands running in parallel

I'm trying to write log tool, that will connect to few servers via ssh, open specified log file and print the result to System.out.print. for now, I've achieved getting logs from one source. Starting at SSHManager class that just use Jsch to achieve that.

public void tailLogFile() {
     System.out.println("Starting to monitor logs for " + server.getIp());
     String command = "tail -f " + server.getLogFilePath();
     try {
         Channel channel = getSession().openChannel("exec");
         ((ChannelExec)channel).setCommand(command);
         InputStream commandOutput = channel.getInputStream();
         channel.connect();
         int readByte = commandOutput.read();

         while(readByte != 0xffffffff) {
             readByte = commandOutput.read();
             System.out.print(server.getFontColor().toString() + (char)readByte);
         }
         channel.disconnect();

     } catch (Exception e) {
         e.printStackTrace();
     }
 }

I'm guessing the rest is irrevelant here, it prints coloured logs from SSH to my System.out. But, main purpouse of this program is to log multiple files into one place. So I'v tried following

for(SSHManager sshManager : getSshManagers()) {
       sshManager.tailLogFile();
}

And it is not working now, it starts to print logs from first iteration of for-loop and since while inside SSHManager.tailLogFile() doesn't terminate, it keeps printing logs from the first source. As you can imagine, I'd like those n instances of SSHManager to share System.out and give me output from all sources at same time. I'm wondering what's the easiest way to achieve that? I need to dive into concurrency?

Upvotes: 1

Views: 470

Answers (2)

Michael Dussere
Michael Dussere

Reputation: 498

As for me, I prefer providing an OutputStream for the channel to write to instead of reading from the InputStream it provides me.

I would define something like that:

protected class MyOutputStream extends OutputStream {

    private StringBuilder stringBuilder = new StringBuilder();
    private Object lock;

    public MyOutputStream(Object lock) {
        this.lock = lock;
    }

    @Override
    public void write(int b) throws IOException {
        this.stringBuilder.append(b);

        if (b == '\n') {
            this.parseOutput();
        }
    }

    @Override
    public void write(byte[] b) throws IOException {
        String str = new String(b);
        this.stringBuilder.append(str);

        if (str.contains("\n")) {
            this.parseOutput();
        }
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        String str = new String(b, off, len);
        this.stringBuilder.append(str);

        if (str.contains("\n")) {
            this.parseOutput();
        }
    }

    @Override
    public void flush() throws IOException {
    }

    @Override
    public void close() throws IOException {
        LOGGER.info("My output stream has closed");
    }

    private void parseOutput() throws IOException {
        // we split the text but we make sure not to drop the empty strings or the trailing char
        String[] lines = this.stringBuilder.toString().split("\n", -1);

        int num = 0;
        int last = lines.length - 1;
        String trunkated = null;

        // synchronize the writing
        synchronized (this.lock) {
            for (String line : lines) {
                // Dont treat the trunkated last line
                if (num == last && line.length() > 0) {
                    trunkated = line;
                    break;
                }
                // write a full line    
                System.out.print(line);     

                num++;
            }
        }

        // flush the buffer and keep the last trunkated line
        this.stringBuilder.setLength(0);
        if (trunkated != null) {
            this.stringBuilder.append(trunkated);
        }
    }
}

So the usage would be like that:

ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();
Object lock = new Object();

ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand("echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);

channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand("sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);

for (ChannelExec channel : channels) {
    while (!channel.isClosed()) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    } 
}

The good point is that you benefit to the multithreading already existing in the Jsch channels and then you avoid the problems of a flooding log that won't let the other logs being printed. It's also easier and more clear to handle each log with a different stream class. The StringBuilder is a good way to accumulate the chars until you get a full line.

Remark also that writing a whole line at once avoids to have one function call per char and to multiply the number of written chars with your systematic server.getFontColor().toString()

Be sure to lock correctly, the code I wrote is not tested.

Upvotes: 1

Martin Prikryl
Martin Prikryl

Reputation: 202474

You have to read all output streams continuously in a non-blocking way.

You can use InputStream.available(), like this:

ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();

ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand(
    "echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.connect();
channels.add(channel);

channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand(
    "sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.connect();
channels.add(channel);

ArrayList<InputStream> outputs = new ArrayList<InputStream>();
for (int i = 0; i < channels.size(); i++)
{
    outputs.add(channels.get(i).getInputStream());
}

Boolean anyOpened = true;
while (anyOpened)
{
    anyOpened = false;
    for (int i = 0; i < channels.size(); i++)
    {
        channel = channels.get(i);
        if (!channel.isClosed())
        {
            anyOpened = true;
            InputStream output = outputs.get(i);
            while (output.available() > 0)
            {
                int readByte = output.read();
                System.out.print((char)readByte);
            }
        }
    }
}

Will get you (assuming Linux server):

one
eins
two
zwei
three
drei

Note that the answer reads the output by bytes/characters. It does not guarantee, that you get a full line, before switching to another session. So you may end up mixing parts of lines from different sessions. You should accumulate bytes/characters in a buffer, looking for a new line, before printing the buffer to an output.

Upvotes: 1

Related Questions