LiuYan 刘研
LiuYan 刘研

Reputation: 1624

Implementing pipe in java, the second program does not output in time (ping | grep)

I'm writing an IRC bot which can execute several commands and output the result to IRC channel. These commands are connected using pipe which is implemented in java.

In most cases, it works fine. But when executing ping 127.0.0.1 | grep -in ttl, the grep program does not output in time: it only output in about every 38 seconds, see the screenshot below.

java PipingTest screenshot 1

It looks like somewhere is buffering the output. How to let the second program output in time?

Sample code

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class PipingTest
{
    static ExecutorService executor = Executors.newFixedThreadPool (10);

    static class CommandRunner implements Runnable
    {
        ProcessBuilder pb;
        String program;
        Process process;
        boolean isPipingOut = false;
        CyclicBarrier barrier;
        CommandRunner previousCommand;
        CommandRunner nextCommand;
        public OutputStream out = null;
        public InputStream in = null;
        public InputStream err = null;
        public OutputStream nextOut = null;

        public CommandRunner (ProcessBuilder pb, boolean isPipingOut, CyclicBarrier barrier, CommandRunner previousCommand, CommandRunner nextCommand)
        {
            this.pb = pb;
            program = pb.command().get(0);
            this.isPipingOut = isPipingOut;
            this.barrier = barrier;
            this.previousCommand = previousCommand;
            if (previousCommand!=null)
                previousCommand.nextCommand = this;
            this.nextCommand = nextCommand;
            if (nextCommand!=null)
                nextCommand.previousCommand = this;
        }
        @Override
        public void run ()
        {
            System.out.println ("Command [" + program + "] thread ID = " + Thread.currentThread().getId());
            try
            {
                process = pb.start ();
                out = process.getOutputStream ();
                in = process.getInputStream ();
                err = process.getErrorStream ();

                if (! isPipingOut)
                {
                    System.out.println (program + ": Synchronizing with [" + previousCommand.program + "] ...");
                    barrier.await ();
                }
                if (isPipingOut)
                {
                    System.out.println (program + ": Synchronizing with [" + nextCommand.program + "] ...");
                    barrier.await ();
                    System.out.println ("ok, synchronized, now go piping");
                    executor.execute (new Pipe(in, nextCommand.out));
                }

                BufferedReader br = null;
                String line;
                if (! isPipingOut)
                {
                    System.out.println (program + ": Consuming the stdout...");
                    br = new BufferedReader (new InputStreamReader(in));
                    while ((line = br.readLine()) != null)
                    {
                        System.out.println (line);
                    }
                    System.out.println (program + ": stdout consumed");
                }
                System.out.println (program + ": Consuming the stderr...");
                br = new BufferedReader (new InputStreamReader(err));
                while ((line = br.readLine()) != null)
                {
                    System.out.println (line);
                }
                System.out.println (program + ": stderr consumed");

                int rc = process.waitFor ();
                System.out.println (program + ": exited value " + rc);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }

    static class Pipe implements Runnable
    {
        public OutputStream out = null;
        public InputStream in = null;
        public Pipe (InputStream in, OutputStream out)
        {
            this.in = in;
            this.out = out;
        }
        @Override
        public void run ()
        {
            System.out.println ("Pipe thread ID = " + Thread.currentThread().getId());
            System.out.println ("Piping...");
            long nTotal = 0;
            int nRead = 0;
            try
            {
                byte[] small_buffer = new byte[32];
                while (-1 != (nRead = in.read(small_buffer)))
                {
                    out.write (small_buffer, 0, nRead);
                    nTotal += nRead;
                    out.flush ();   // let next command get piped data as soon as possible, does it work?
                    System.out.println (new java.sql.Time(System.currentTimeMillis()) + "    piped " + nRead + " bytes, total=" + nTotal);
                }
                System.out.println ("Total piped " + nTotal + " bytes");
                in.close ();
                out.flush ();
                out.close ();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }

    static class WatchDog implements Runnable
    {
        int timeout = 0;
        List<CommandRunner> commands;
        public WatchDog (int timeout, List<CommandRunner> commands)
        {
            this.timeout = timeout;
            this.commands = commands;
        }
        @Override
        public void run ()
        {
            System.out.println ("WatchDog thread ID = " + Thread.currentThread().getId());
            try
            {
                TimeUnit.SECONDS.sleep (timeout);
                System.out.println ("WatchDog timeout, killing commands...");
                for (CommandRunner command : commands)
                {
                    System.out.println ("Killing command " + command.pb.command().get(0));
                    command.process.destroy ();
                }
                executor.shutdown ();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }

    }

    public static void main (String[] args)
    {
        ProcessBuilder pbCmd1, pbCmd2;
        pbCmd1 = new ProcessBuilder ("ping", "127.0.0.1");
        //pbCmd1 = new ProcessBuilder ("yes", "ttl=123");
        pbCmd2 = new ProcessBuilder ("grep", "-in", "--color=always", "ttl");
        //pbCmd1 = new ProcessBuilder ("cat", "/etc/passwd");
        //pbCmd2 = new ProcessBuilder ("grep", "-in", "--color=always", "root");
        CyclicBarrier barrier = new CyclicBarrier(2);

        List<CommandRunner> commands = new ArrayList<CommandRunner> ();
        CommandRunner cmd1 = new CommandRunner (pbCmd1, true, barrier, null, null);
        CommandRunner cmd2 = new CommandRunner (pbCmd2, false, barrier, cmd1, null);

        commands.add (cmd1);
        commands.add (cmd2);
        WatchDog watchdog = new WatchDog (90, commands);

        executor.execute (cmd1);
        executor.execute (cmd2);
        executor.execute (watchdog);
    }
}

Upvotes: 2

Views: 541

Answers (1)

dwurf
dwurf

Reputation: 12769

grep is not flushing its output buffer on every line. You will see something similar if you run something like:

ping 127.0.0.1 | grep -in ttl | cat

You can fix this by running grep with the --line-buffered option:

ping 127.0.0.1 | grep --line-buffered -in ttl

or a more general solution with stdbuf:

ping 127.0.0.1 | stdbuf -o0 grep -in ttl

See also: How to 'grep' a continuous stream?

Upvotes: 1

Related Questions