Reputation: 941
I would like to direct a python script's subprocess' stdout and stdin into the same file. What I don't know is how to make the lines from the two sources distinguishable? (For example prefix the lines from stderr with an exclamation mark.)
In my particular case there is no need for live monitoring of the subprocess, the executing Python script can wait for the end of its execution.
Upvotes: 60
Views: 29799
Reputation: 1969
Improving on T.Rojan's code so it works when stderr or stdout receive content longer than one line.
# Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
process = subprocess.Popen([sys.executable, temp_file.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Capture the output while the process is running by polling the stdout and stderr pipes and reading from them
poll = select.poll()
poll.register(process.stdout,select.POLLIN | select.POLLHUP)
poll.register(process.stderr,select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
for event in events:
(rfd, event) = event
if event & select.POLLIN:
if rfd == process.stdout.fileno():
while True:
line = process.stdout.readline()
if len(line) == 0:
break
# We don't want to print the newline character at the end of the line so we slice it off
logger.info(line[:-1].decode('utf-8'))
if rfd == process.stderr.fileno():
while True:
line = process.stderr.readline()
if len(line) == 0:
break
logger.error(line[:-1].decode('utf-8'))
if event & select.POLLHUP:
poll.unregister(rfd)
pollc = pollc - 1
if pollc > 0:
events = poll.poll()
process.wait()
However I made these classes which are far better in my opinion but go a fair bit beyond the scope of this question. You'll probably want to edit out DEBUG:
code_executor.py
import logging, os, select, subprocess, sys, tempfile, pty
from colorama import Fore
from definitions import DEBUG
from typing import Dict, Optional, Any, List, Tuple
import TimeoutHandler
import FirstInFirstOutIO
class CodeExecutor:
# If live_output is True, the output of the code will be printed to stdout as it is generated.
# If live_output is True or False you will still always have the full output string retuned in the Tuple along with the success boolean
# max_output_size is the maximum size of the output string. Helpful to prevent excessive memory usage, and to prevent the output from being too large to send to OpenAI
# timeout_seconds is the maximum number of seconds the code is allowed to run before it is terminated. TODO support Windows by using threading instead of signal.alarm
def execute_code(self, code: str, live_output: bool= True, max_output_size: int = 1000, timeout_seconds: int = 10) -> Tuple[bool, str]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Setup the handler with a FirstInFirstOutIO object
log_capture_string = FirstInFirstOutIO(max_output_size)
handler = logging.StreamHandler(log_capture_string)
logger.addHandler(handler)
success = True
# Create a temporary file to store the provided code
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.py') as temp_file:
temp_file.write(code)
temp_file.flush()
try:
with TimeoutHandler(timeout_seconds):
master, slave = pty.openpty()
# Use subprocess.Popen to run the code in the temporary file and capture stdout and stderr
process = subprocess.Popen([sys.executable, temp_file.name], stdout=slave, stderr=slave, universal_newlines=True)
os.close(slave)
timeout = 0.1 # A small timeout value for os.read
while True:
rlist, _, _ = select.select([master], [], [], timeout)
if rlist:
data = os.read(master, 1024).decode('utf-8')
if not data:
break
for line in data.splitlines():
if live_output:
print(line)
logger.info(line)
if not process.poll() is None:
break
except TimeoutError:
process.kill()
# Handle timeout errors by appending a timeout error message to the logger and setting success to false
message=f"Provided code took too long to finish execution. TimeoutError: Timeout after {timeout_seconds} seconds."
logger.error(message)
if live_output:
print(message)
success = False
except subprocess.CalledProcessError as e:
# Handle errors in the subprocess by appending the error message to the logger and setting success to false
message=f"Error executing code: {str(e)}"
logger.error(message)
if live_output:
print(message)
success = False
finally:
# Remove the temporary file after execution
os.remove(temp_file.name)
output_string = log_capture_string.getvalue()
log_capture_string.close()
logger.removeHandler(handler) # Just being explicit here
if DEBUG:
print(f"{Fore.YELLOW} Would you like to see the output of the code? (y/n){Fore.RESET}")
if input().lower() == 'y':
print(output_string)
return success, output_string
first_in_first_out_io.py
import io, collections
class FirstInFirstOutIO(io.TextIOBase):
def __init__(self, size, *args):
self.maxsize = size
io.TextIOBase.__init__(self, *args)
self.deque = collections.deque()
def getvalue(self):
return ''.join(self.deque)
def write(self, x):
self.deque.append(x)
self.shrink()
def shrink(self):
if self.maxsize is None:
return
size = sum(len(x) for x in self.deque)
while size > self.maxsize:
x = self.deque.popleft()
size -= len(x)
timeout_handler.py
import signal
import sys
# This is a context manager that will raise a TimeoutError if the code inside
# the context manager takes longer than the given number of seconds
class TimeoutHandler:
def __init__(self, seconds: int):
self.seconds = seconds
def __enter__(self):
if sys.platform == "win32":
# Windows does not support SIGALRM, so skip the timeout handling
return self
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
return self
def __exit__(self, exc_type, exc_value, traceback):
if sys.platform != "win32":
signal.alarm(0)
def handle_timeout(self, signum, frame):
raise TimeoutError(f"Timeout after {self.seconds} seconds.")
Upvotes: 4
Reputation: 414865
At the moment all other answers don't handle buffering on the child subprocess' side if the subprocess is not a Python script that accepts -u
flag. See "Q: Why not just use a pipe (popen())?" in the pexpect documentation.
To simulate -u
flag for some of C stdio-based (FILE*
) programs you could try stdbuf
.
If you ignore this then your output won't be properly interleaved and might look like:
stderr
stderr
...large block of stdout including parts that are printed before stderr...
You could try it with the following client program, notice the difference with/without -u
flag (['stdbuf', '-o', 'L', 'child_program']
also fixes the output):
#!/usr/bin/env python
from __future__ import print_function
import random
import sys
import time
from datetime import datetime
def tprint(msg, file=sys.stdout):
time.sleep(.1*random.random())
print("%s %s" % (datetime.utcnow().strftime('%S.%f'), msg), file=file)
tprint("stdout1 before stderr")
tprint("stdout2 before stderr")
for x in range(5):
tprint('stderr%d' % x, file=sys.stderr)
tprint("stdout3 after stderr")
On Linux you could use pty
to get the same behavior as when the subprocess runs interactively e.g., here's a modified @T.Rojan's answer:
import logging, os, select, subprocess, sys, pty
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
master_fd, slave_fd = pty.openpty()
p = subprocess.Popen(args,stdout=slave_fd, stderr=subprocess.PIPE, close_fds=True)
with os.fdopen(master_fd) as stdout:
poll = select.poll()
poll.register(stdout, select.POLLIN)
poll.register(p.stderr,select.POLLIN | select.POLLHUP)
def cleanup(_done=[]):
if _done: return
_done.append(1)
poll.unregister(p.stderr)
p.stderr.close()
poll.unregister(stdout)
assert p.poll() is not None
read_write = {stdout.fileno(): (stdout.readline, logger.info),
p.stderr.fileno(): (p.stderr.readline, logger.error)}
while True:
events = poll.poll(40) # poll with a small timeout to avoid both
# blocking forever and a busy loop
if not events and p.poll() is not None:
# no IO events and the subprocess exited
cleanup()
break
for fd, event in events:
if event & select.POLLIN: # there is something to read
read, write = read_write[fd]
line = read()
if line:
write(line.rstrip())
elif event & select.POLLHUP: # free resources if stderr hung up
cleanup()
else: # something unexpected happened
assert 0
sys.exit(p.wait()) # return child's exit code
It assumes that stderr is always unbuffered/line-buffered and stdout is line-buffered in an interactive mode. Only full lines are read. The program might block if there are non-terminated lines in the output.
Upvotes: 3
Reputation: 824
tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
subprocess.STDOUT
is a special flag that tells subprocess to route all stderr output to stdout, thus combining your two streams.
btw, select doesn't have a poll() in windows. subprocess only uses the file handle number, and doesn't call your file output object's write method.
to capture the output, do something like:
logfile = open(logfilename, 'w')
while tsk.poll() is None:
line = tsk.stdout.readline()
logfile.write(line)
Upvotes: 70
Reputation: 203
I found myself having to tackle this problem recently, and it took a while to get something I felt worked correctly in most cases, so here it is! (It also has the nice side effect of processing the output via a python logger, which I've noticed is another common question here on Stackoverflow).
Here is the code:
import sys
import logging
import subprocess
from threading import Thread
logging.basicConfig(stream=sys.stdout,level=logging.INFO)
logging.addLevelName(logging.INFO+2,'STDERR')
logging.addLevelName(logging.INFO+1,'STDOUT')
logger = logging.getLogger('root')
pobj = subprocess.Popen(['python','-c','print 42;bargle'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def logstream(stream,loggercb):
while True:
out = stream.readline()
if out:
loggercb(out.rstrip())
else:
break
stdout_thread = Thread(target=logstream,
args=(pobj.stdout,lambda s: logger.log(logging.INFO+1,s)))
stderr_thread = Thread(target=logstream,
args=(pobj.stderr,lambda s: logger.log(logging.INFO+2,s)))
stdout_thread.start()
stderr_thread.start()
while stdout_thread.isAlive() and stderr_thread.isAlive():
pass
Here is the output:
STDOUT:root:42
STDERR:root:Traceback (most recent call last):
STDERR:root: File "<string>", line 1, in <module>
STDERR:root:NameError: name 'bargle' is not defined
You can replace the subprocess call to do whatever you want, I just chose running python with a command that I knew would print to both stdout and stderr. The key bit is reading stderr and stdout each in a separate thread. Otherwise you may be blocking on reading one while there is data ready to be read on the other.
Upvotes: 14
Reputation: 111
If you want to interleave to get roughly the same order that you would if you ran the process interactively then you need to do what the shell does and poll stdin/stdout and write in the order that they poll.
Here's some code that does something along the lines of what you want - in this case sending the stdout/stderr to a logger info/error streams.
tsk = subprocess.Popen(args,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
poll = select.poll()
poll.register(tsk.stdout,select.POLLIN | select.POLLHUP)
poll.register(tsk.stderr,select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
for event in events:
(rfd,event) = event
if event & select.POLLIN:
if rfd == tsk.stdout.fileno():
line = tsk.stdout.readline()
if len(line) > 0:
logger.info(line[:-1])
if rfd == tsk.stderr.fileno():
line = tsk.stderr.readline()
if len(line) > 0:
logger.error(line[:-1])
if event & select.POLLHUP:
poll.unregister(rfd)
pollc = pollc - 1
if pollc > 0: events = poll.poll()
tsk.wait()
Upvotes: 9
Reputation: 6955
I suggest you write your own handlers, something like (not tested, I hope you catch the idea):
class my_buffer(object):
def __init__(self, fileobject, prefix):
self._fileobject = fileobject
self.prefix = prefix
def write(self, text):
return self._fileobject.write('%s %s' % (self.prefix, text))
# delegate other methods to fileobject if necessary
log_file = open('log.log', 'w')
my_out = my_buffer(log_file, 'OK:')
my_err = my_buffer(log_file, '!!!ERROR:')
p = subprocess.Popen(command, stdout=my_out, stderr=my_err, shell=True)
Upvotes: 1
Reputation: 663
You may write the stdout/err to a file after the command execution. In the example below I use pickling so I am sure I will be able to read without any particular parsing to differentiate between the stdout/err and at some point I could dumo the exitcode and the command itself.
import subprocess
import cPickle
command = 'ls -altrh'
outfile = 'log.errout'
pipe = subprocess.Popen(command, stdout = subprocess.PIPE,
stderr = subprocess.PIPE, shell = True)
stdout, stderr = pipe.communicate()
f = open(outfile, 'w')
cPickle.dump({'out': stdout, 'err': stderr},f)
f.close()
Upvotes: 0