beemtee
beemtee

Reputation: 941

Merging a Python script's subprocess' stdout and stderr while keeping them distinguishable

I would like to direct a python script's subprocess' stdout and stdin into the same file. What I don't know is how to make the lines from the two sources distinguishable? (For example prefix the lines from stderr with an exclamation mark.)

In my particular case there is no need for live monitoring of the subprocess, the executing Python script can wait for the end of its execution.

Upvotes: 60

Views: 29799

Answers (7)

Shardj
Shardj

Reputation: 1969

Improving on T.Rojan's code so it works when stderr or stdout receive content longer than one line.

# Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
process = subprocess.Popen([sys.executable, temp_file.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Capture the output while the process is running by polling the stdout and stderr pipes and reading from them
poll = select.poll()
poll.register(process.stdout,select.POLLIN | select.POLLHUP)
poll.register(process.stderr,select.POLLIN | select.POLLHUP)
pollc = 2

events = poll.poll()
while pollc > 0 and len(events) > 0:
  for event in events:
    (rfd, event) = event
    if event & select.POLLIN:
      if rfd == process.stdout.fileno():
        while True:
          line = process.stdout.readline()
          if len(line) == 0:
            break
          # We don't want to print the newline character at the end of the line so we slice it off
          logger.info(line[:-1].decode('utf-8'))
      if rfd == process.stderr.fileno():
        while True:
          line = process.stderr.readline()
          if len(line) == 0:
            break
          logger.error(line[:-1].decode('utf-8'))
    if event & select.POLLHUP:
      poll.unregister(rfd)
      pollc = pollc - 1
  if pollc > 0:
    events = poll.poll()

process.wait()

However I made these classes which are far better in my opinion but go a fair bit beyond the scope of this question. You'll probably want to edit out DEBUG:

code_executor.py

import logging, os, select, subprocess, sys, tempfile, pty
from colorama import Fore
from definitions import DEBUG
from typing import Dict, Optional, Any, List, Tuple
import TimeoutHandler
import FirstInFirstOutIO

class CodeExecutor:
  # If live_output is True, the output of the code will be printed to stdout as it is generated.
  # If live_output is True or False you will still always have the full output string retuned in the Tuple along with the success boolean
  # max_output_size is the maximum size of the output string. Helpful to prevent excessive memory usage, and to prevent the output from being too large to send to OpenAI
  # timeout_seconds is the maximum number of seconds the code is allowed to run before it is terminated. TODO support Windows by using threading instead of signal.alarm
  def execute_code(self, code: str, live_output: bool= True, max_output_size: int = 1000, timeout_seconds: int = 10) -> Tuple[bool, str]:
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    # Setup the handler with a FirstInFirstOutIO object
    log_capture_string = FirstInFirstOutIO(max_output_size)
    handler = logging.StreamHandler(log_capture_string)
    logger.addHandler(handler)

    success = True

    # Create a temporary file to store the provided code
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.py') as temp_file:
      temp_file.write(code)
      temp_file.flush()
    try:
      with TimeoutHandler(timeout_seconds):
        master, slave = pty.openpty()
        # Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
        process = subprocess.Popen([sys.executable, temp_file.name], stdout=slave, stderr=slave, universal_newlines=True)
        os.close(slave)
        timeout = 0.1  # A small timeout value for os.read
        while True:
          rlist, _, _ = select.select([master], [], [], timeout)
          if rlist:
            data = os.read(master, 1024).decode('utf-8')
            if not data:
              break
            for line in data.splitlines():
              if live_output:
                print(line)
              logger.info(line)
          if not process.poll() is None:
            break

    except TimeoutError:
      process.kill()
      # Handle timeout errors by appending a timeout error message to the logger and setting success to false
      message=f"Provided code took too long to finish execution. TimeoutError: Timeout after {timeout_seconds} seconds."
      logger.error(message)
      if live_output:
        print(message)
      success = False
    except subprocess.CalledProcessError as e:
      # Handle errors in the subprocess by appending the error message to the logger and setting success to false
      message=f"Error executing code: {str(e)}"
      logger.error(message)
      if live_output:
        print(message)
      success = False
    finally:
      # Remove the temporary file after execution
      os.remove(temp_file.name)
      output_string = log_capture_string.getvalue()
      log_capture_string.close()
      logger.removeHandler(handler) # Just being explicit here
      if DEBUG:
        print(f"{Fore.YELLOW} Would you like to see the output of the code? (y/n){Fore.RESET}")
        if input().lower() == 'y':
          print(output_string)
      return success, output_string

first_in_first_out_io.py

import io, collections

class FirstInFirstOutIO(io.TextIOBase):
    def __init__(self, size, *args):
        self.maxsize = size
        io.TextIOBase.__init__(self, *args)
        self.deque = collections.deque()
    def getvalue(self):
        return ''.join(self.deque)
    def write(self, x):
        self.deque.append(x)
        self.shrink()
    def shrink(self):
        if self.maxsize is None:
            return
        size = sum(len(x) for x in self.deque)
        while size > self.maxsize:
            x = self.deque.popleft()
            size -= len(x)

timeout_handler.py

import signal
import sys

# This is a context manager that will raise a TimeoutError if the code inside 
# the context manager takes longer than the given number of seconds
class TimeoutHandler:
  def __init__(self, seconds: int):
    self.seconds = seconds

  def __enter__(self):
    if sys.platform == "win32":
      # Windows does not support SIGALRM, so skip the timeout handling
      return self
    signal.signal(signal.SIGALRM, self.handle_timeout)
    signal.alarm(self.seconds)
    return self

  def __exit__(self, exc_type, exc_value, traceback):
    if sys.platform != "win32":
      signal.alarm(0)

  def handle_timeout(self, signum, frame):
    raise TimeoutError(f"Timeout after {self.seconds} seconds.")

Upvotes: 4

jfs
jfs

Reputation: 414865

At the moment all other answers don't handle buffering on the child subprocess' side if the subprocess is not a Python script that accepts -u flag. See "Q: Why not just use a pipe (popen())?" in the pexpect documentation.

To simulate -u flag for some of C stdio-based (FILE*) programs you could try stdbuf.

If you ignore this then your output won't be properly interleaved and might look like:

stderr
stderr
...large block of stdout including parts that are printed before stderr...

You could try it with the following client program, notice the difference with/without -u flag (['stdbuf', '-o', 'L', 'child_program'] also fixes the output):

#!/usr/bin/env python
from __future__ import print_function
import random
import sys
import time
from datetime import datetime

def tprint(msg, file=sys.stdout):
    time.sleep(.1*random.random())
    print("%s %s" % (datetime.utcnow().strftime('%S.%f'), msg), file=file)

tprint("stdout1 before stderr")
tprint("stdout2 before stderr")
for x in range(5):
    tprint('stderr%d' % x, file=sys.stderr)
tprint("stdout3 after stderr")

On Linux you could use pty to get the same behavior as when the subprocess runs interactively e.g., here's a modified @T.Rojan's answer:

import logging, os, select, subprocess, sys, pty

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

master_fd, slave_fd = pty.openpty()
p = subprocess.Popen(args,stdout=slave_fd, stderr=subprocess.PIPE, close_fds=True)
with os.fdopen(master_fd) as stdout:
    poll = select.poll()
    poll.register(stdout, select.POLLIN)
    poll.register(p.stderr,select.POLLIN | select.POLLHUP)

    def cleanup(_done=[]):
        if _done: return
        _done.append(1)
        poll.unregister(p.stderr)
        p.stderr.close()
        poll.unregister(stdout)
        assert p.poll() is not None

    read_write = {stdout.fileno(): (stdout.readline, logger.info),
                  p.stderr.fileno(): (p.stderr.readline, logger.error)}
    while True:
        events = poll.poll(40) # poll with a small timeout to avoid both
                               # blocking forever and a busy loop
        if not events and p.poll() is not None:
            # no IO events and the subprocess exited
            cleanup()
            break

        for fd, event in events:
            if event & select.POLLIN: # there is something to read
                read, write = read_write[fd]
                line = read()
                if line:
                    write(line.rstrip())
            elif event & select.POLLHUP: # free resources if stderr hung up
                cleanup()
            else: # something unexpected happened
                assert 0
sys.exit(p.wait()) # return child's exit code

It assumes that stderr is always unbuffered/line-buffered and stdout is line-buffered in an interactive mode. Only full lines are read. The program might block if there are non-terminated lines in the output.

Upvotes: 3

mossman
mossman

Reputation: 824

tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)

subprocess.STDOUT is a special flag that tells subprocess to route all stderr output to stdout, thus combining your two streams.

btw, select doesn't have a poll() in windows. subprocess only uses the file handle number, and doesn't call your file output object's write method.

to capture the output, do something like:

logfile = open(logfilename, 'w')

while tsk.poll() is None:
    line = tsk.stdout.readline()
    logfile.write(line)

Upvotes: 70

guyincognito
guyincognito

Reputation: 203

I found myself having to tackle this problem recently, and it took a while to get something I felt worked correctly in most cases, so here it is! (It also has the nice side effect of processing the output via a python logger, which I've noticed is another common question here on Stackoverflow).

Here is the code:

import sys
import logging
import subprocess
from threading import Thread

logging.basicConfig(stream=sys.stdout,level=logging.INFO)
logging.addLevelName(logging.INFO+2,'STDERR')
logging.addLevelName(logging.INFO+1,'STDOUT')
logger = logging.getLogger('root')

pobj = subprocess.Popen(['python','-c','print 42;bargle'], 
    stdout=subprocess.PIPE, stderr=subprocess.PIPE)

def logstream(stream,loggercb):
    while True:
        out = stream.readline()
        if out:
            loggercb(out.rstrip())       
        else:
            break

stdout_thread = Thread(target=logstream,
    args=(pobj.stdout,lambda s: logger.log(logging.INFO+1,s)))

stderr_thread = Thread(target=logstream,
    args=(pobj.stderr,lambda s: logger.log(logging.INFO+2,s)))

stdout_thread.start()
stderr_thread.start()

while stdout_thread.isAlive() and stderr_thread.isAlive():
     pass

Here is the output:

STDOUT:root:42
STDERR:root:Traceback (most recent call last):
STDERR:root:  File "<string>", line 1, in <module>
STDERR:root:NameError: name 'bargle' is not defined

You can replace the subprocess call to do whatever you want, I just chose running python with a command that I knew would print to both stdout and stderr. The key bit is reading stderr and stdout each in a separate thread. Otherwise you may be blocking on reading one while there is data ready to be read on the other.

Upvotes: 14

T.Rojan
T.Rojan

Reputation: 111

If you want to interleave to get roughly the same order that you would if you ran the process interactively then you need to do what the shell does and poll stdin/stdout and write in the order that they poll.

Here's some code that does something along the lines of what you want - in this case sending the stdout/stderr to a logger info/error streams.

tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

poll = select.poll()
poll.register(tsk.stdout,select.POLLIN | select.POLLHUP)
poll.register(tsk.stderr,select.POLLIN | select.POLLHUP)
pollc = 2

events = poll.poll()
while pollc > 0 and len(events) > 0:
  for event in events:
    (rfd,event) = event
    if event & select.POLLIN:
      if rfd == tsk.stdout.fileno():
        line = tsk.stdout.readline()
        if len(line) > 0:
          logger.info(line[:-1])
      if rfd == tsk.stderr.fileno():
        line = tsk.stderr.readline()
        if len(line) > 0:
          logger.error(line[:-1])
    if event & select.POLLHUP:
      poll.unregister(rfd)
      pollc = pollc - 1
    if pollc > 0: events = poll.poll()
tsk.wait()

Upvotes: 9

Guard
Guard

Reputation: 6955

I suggest you write your own handlers, something like (not tested, I hope you catch the idea):

class my_buffer(object):
    def __init__(self, fileobject, prefix):
        self._fileobject = fileobject
        self.prefix = prefix
    def write(self, text):
        return self._fileobject.write('%s %s' % (self.prefix, text))
    # delegate other methods to fileobject if necessary

log_file = open('log.log', 'w')
my_out = my_buffer(log_file, 'OK:')
my_err = my_buffer(log_file, '!!!ERROR:')
p = subprocess.Popen(command, stdout=my_out, stderr=my_err, shell=True)

Upvotes: 1

Cinquo
Cinquo

Reputation: 663

You may write the stdout/err to a file after the command execution. In the example below I use pickling so I am sure I will be able to read without any particular parsing to differentiate between the stdout/err and at some point I could dumo the exitcode and the command itself.

import subprocess
import cPickle

command = 'ls -altrh'
outfile = 'log.errout'
pipe = subprocess.Popen(command, stdout = subprocess.PIPE,
                        stderr = subprocess.PIPE, shell = True)
stdout, stderr = pipe.communicate()

f = open(outfile, 'w')
cPickle.dump({'out': stdout, 'err': stderr},f)
f.close()

Upvotes: 0

Related Questions