Reputation: 1624
I'm writing an IRC bot which can execute several commands and output the result to IRC channel. These commands are connected using pipe which is implemented in java.
In most cases, it works fine. But when executing ping 127.0.0.1 | grep -in ttl
, the grep
program does not output in time: it only output in about every 38 seconds, see the screenshot below.
It looks like somewhere is buffering the output. How to let the second program output in time?
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class PipingTest
{
static ExecutorService executor = Executors.newFixedThreadPool (10);
static class CommandRunner implements Runnable
{
ProcessBuilder pb;
String program;
Process process;
boolean isPipingOut = false;
CyclicBarrier barrier;
CommandRunner previousCommand;
CommandRunner nextCommand;
public OutputStream out = null;
public InputStream in = null;
public InputStream err = null;
public OutputStream nextOut = null;
public CommandRunner (ProcessBuilder pb, boolean isPipingOut, CyclicBarrier barrier, CommandRunner previousCommand, CommandRunner nextCommand)
{
this.pb = pb;
program = pb.command().get(0);
this.isPipingOut = isPipingOut;
this.barrier = barrier;
this.previousCommand = previousCommand;
if (previousCommand!=null)
previousCommand.nextCommand = this;
this.nextCommand = nextCommand;
if (nextCommand!=null)
nextCommand.previousCommand = this;
}
@Override
public void run ()
{
System.out.println ("Command [" + program + "] thread ID = " + Thread.currentThread().getId());
try
{
process = pb.start ();
out = process.getOutputStream ();
in = process.getInputStream ();
err = process.getErrorStream ();
if (! isPipingOut)
{
System.out.println (program + ": Synchronizing with [" + previousCommand.program + "] ...");
barrier.await ();
}
if (isPipingOut)
{
System.out.println (program + ": Synchronizing with [" + nextCommand.program + "] ...");
barrier.await ();
System.out.println ("ok, synchronized, now go piping");
executor.execute (new Pipe(in, nextCommand.out));
}
BufferedReader br = null;
String line;
if (! isPipingOut)
{
System.out.println (program + ": Consuming the stdout...");
br = new BufferedReader (new InputStreamReader(in));
while ((line = br.readLine()) != null)
{
System.out.println (line);
}
System.out.println (program + ": stdout consumed");
}
System.out.println (program + ": Consuming the stderr...");
br = new BufferedReader (new InputStreamReader(err));
while ((line = br.readLine()) != null)
{
System.out.println (line);
}
System.out.println (program + ": stderr consumed");
int rc = process.waitFor ();
System.out.println (program + ": exited value " + rc);
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
static class Pipe implements Runnable
{
public OutputStream out = null;
public InputStream in = null;
public Pipe (InputStream in, OutputStream out)
{
this.in = in;
this.out = out;
}
@Override
public void run ()
{
System.out.println ("Pipe thread ID = " + Thread.currentThread().getId());
System.out.println ("Piping...");
long nTotal = 0;
int nRead = 0;
try
{
byte[] small_buffer = new byte[32];
while (-1 != (nRead = in.read(small_buffer)))
{
out.write (small_buffer, 0, nRead);
nTotal += nRead;
out.flush (); // let next command get piped data as soon as possible, does it work?
System.out.println (new java.sql.Time(System.currentTimeMillis()) + " piped " + nRead + " bytes, total=" + nTotal);
}
System.out.println ("Total piped " + nTotal + " bytes");
in.close ();
out.flush ();
out.close ();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
static class WatchDog implements Runnable
{
int timeout = 0;
List<CommandRunner> commands;
public WatchDog (int timeout, List<CommandRunner> commands)
{
this.timeout = timeout;
this.commands = commands;
}
@Override
public void run ()
{
System.out.println ("WatchDog thread ID = " + Thread.currentThread().getId());
try
{
TimeUnit.SECONDS.sleep (timeout);
System.out.println ("WatchDog timeout, killing commands...");
for (CommandRunner command : commands)
{
System.out.println ("Killing command " + command.pb.command().get(0));
command.process.destroy ();
}
executor.shutdown ();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public static void main (String[] args)
{
ProcessBuilder pbCmd1, pbCmd2;
pbCmd1 = new ProcessBuilder ("ping", "127.0.0.1");
//pbCmd1 = new ProcessBuilder ("yes", "ttl=123");
pbCmd2 = new ProcessBuilder ("grep", "-in", "--color=always", "ttl");
//pbCmd1 = new ProcessBuilder ("cat", "/etc/passwd");
//pbCmd2 = new ProcessBuilder ("grep", "-in", "--color=always", "root");
CyclicBarrier barrier = new CyclicBarrier(2);
List<CommandRunner> commands = new ArrayList<CommandRunner> ();
CommandRunner cmd1 = new CommandRunner (pbCmd1, true, barrier, null, null);
CommandRunner cmd2 = new CommandRunner (pbCmd2, false, barrier, cmd1, null);
commands.add (cmd1);
commands.add (cmd2);
WatchDog watchdog = new WatchDog (90, commands);
executor.execute (cmd1);
executor.execute (cmd2);
executor.execute (watchdog);
}
}
Upvotes: 2
Views: 541
Reputation: 12769
grep is not flushing its output buffer on every line. You will see something similar if you run something like:
ping 127.0.0.1 | grep -in ttl | cat
You can fix this by running grep with the --line-buffered
option:
ping 127.0.0.1 | grep --line-buffered -in ttl
or a more general solution with stdbuf:
ping 127.0.0.1 | stdbuf -o0 grep -in ttl
See also: How to 'grep' a continuous stream?
Upvotes: 1