Reputation: 11074
I'm using the subprocess module to start a subprocess and connect to its output stream (standard output). I want to be able to execute non-blocking reads on its standard output. Is there a way to make .readline non-blocking or to check if there is data on the stream before I invoke .readline
? I'd like this to be portable or at least work under Windows and Linux.
Here is how I do it for now (it's blocking on the .readline
if no data is available):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Upvotes: 640
Views: 337064
Reputation: 55
For anyone who visit here from web search:
The real non-blocking buffer reader way in python, is to use pexpect(for linux/macOS) or wexpect(for windows)
After pip install pexpect
on linux, test these .py
scripts!
#! /usr/bin/env -S conda run --live-stream -n base python
import pexpect
import time
def write(process, message):
process.sendline(message.strip())
def terminate(process):
process.close(force=True)
cmd = 'notify-send Title Body --action=button --print-id'
process = pexpect.spawn(cmd)
process.delaybeforesend = None # No pre-waiting
while True:
try:
output = process.read_nonblocking(size=1024).decode("utf-8").strip()
if output:
print(output)
time.sleep(0.5)
except pexpect.exceptions.TIMEOUT:
continue
except Exception as e:
print(e)
break
terminate(process)
output 1
or other numbers in hurry(immediately)!
After you click button
, output 0
, then EOF and exit
Not always listening
āÆ ./test.py
1
0
End Of File (EOF). Exception style platform.
import subprocess as sp
import time
import os
cmd = 'notify-send Title Body --action=button --print-id'.split()
with sp.Popen(
cmd, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) as p:
os.set_blocking(p.stdout.fileno(), False)
while True:
print('š',p.stdout.readline().decode("utf-8").strip())
time.sleep(0.5)
if you've tried subprocess.Popen()
os.set_blocking(process.stdout.fileno(), False)
selectors
and any other python self-contained module, it's NOT possible to output 1
or other numbers immediately!
Always listening to stdout
āÆ ./test.py
š
š
š 2
š 0
š
š
^CTraceback (most recent call last):
File "./test.py", line 13, in <module>
time.sleep(0.5)
KeyboardInterrupt
CondaError: KeyboardInterrupt
having read the comments in this question above, it seems designed deliberately to wait user's first input...
Upvotes: 0
Reputation: 25283
On Unix-like systems and Python 3.5+ there's os.set_blocking
which does exactly what it says.
import os
import time
import subprocess
cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()
This outputs:
1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
With os.set_blocking
commented it's:
0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
Upvotes: 86
Reputation: 536
This one is pure Python, easy to understand and works just fine:
import subprocess
import sys
import threading
# Function to read and print stdout of the subprocess
def readstdout():
for l in iter(p.stdout.readline, b""):
sys.stdout.write(f'{l.decode("utf-8", "backslashreplace")}\n')
# Function to read and print stderr of the subprocess
def readstderr():
for l in iter(p.stderr.readline, b""):
sys.stderr.write(f'{l.decode("utf-8", "backslashreplace")}\n')
# Function to send a command to the subprocess
def sendcommand(cmd):
p.stdin.write(cmd.encode() + b"\n")
p.stdin.flush()
# Create a subprocess
p = subprocess.Popen(
"adb.exe -s 127.0.0.1:5555 shell",
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Create two threads to read and print stdout and stderr concurrently
t1 = threading.Thread(target=readstdout)
t2 = threading.Thread(target=readstderr)
# Start the threads to capture and print the subprocess output
t1.start()
t2.start()
# Send a command to the subprocess
sendcommand("ls")
Upvotes: 1
Reputation: 127
I'm gonna let you know, there is one SUPER SIMPLE way to do this. I looked for quite a while, tried some of the complicated queue functions etc, all to stumble upon this comment which explains you just need to specify stdout=sys.stdout
in the Popen
import subprocess
import sys
def execute(command):
subprocess.Popen('myprogram.exe', shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)
Upvotes: 0
Reputation: 3026
Not the first and probably not the last, I have built a package that does non blocking stdout PIPE reads with two different methods, one being based on the work of J.F. Sebastian (@jfs)'s answer, the other being a simple communicate() loop with a thread to check for timeouts.
Both stdout capture methods are tested to work both under Linux and Windows, with Python versions from 2.7 to 3.9 as of the time of writing
Being non blocking, it guarantees timeout enforcement, even with multiple child and grandchild processes, and even under Python 2.7.
The package also handles both bytes and text stdout encodings, being a nightmare when trying to catch EOF.
You'll find the package at https://github.com/netinvent/command_runner
If you need some well tested non blocking read implementations, try it out (or hack the code):
pip install command_runner
from command_runner import command_runner
exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')
You can find the core non blocking read code in _poll_process()
or _monitor_process()
depending on the capture method employed.
From there, you can hack your way to what you want, or simply use the whole package to execute your commands as a subprocess replacement.
Upvotes: 4
Reputation: 1185
Existing solutions did not work for me (details below). What finally worked was to implement readline using read(1) (based on this answer). The latter does not block:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Why existing solutions did not work:
Upvotes: 7
Reputation: 49
I have the original questioner's problem, but did not wish to invoke threads. I mixed Jesse's solution with a direct read()
from the pipe, and my own buffer-handler for line reads (however, my sub-process - ping - always wrote full lines < a system page size). I avoid busy-waiting by only reading in a gobject-registered io watch. These days I usually run code within a gobject MainLoop to avoid threads.
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
The watcher is
def watch(f, *other):
print 'reading',f.read()
return True
And the main program sets up a ping and then calls gobject mail loop.
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
Any other work is attached to callbacks in gobject.
Upvotes: 3
Reputation: 900
Here is a simple solution based on threads which:
select
).stdout
and stderr
asynchronouly.asyncio
(which may conflict with other libraries).printer.py
import time
import sys
sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
reader.py
import queue
import subprocess
import sys
import threading
def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()
def enqueue_process(process, queue):
process.wait()
queue.put('x')
p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()
while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2': # stderr
sys.stdout.write("\033[0;31m") # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m") # reset ANSI code
sys.stdout.flush()
tp.join()
to.join()
te.join()
Upvotes: 10
Reputation: 3752
Try wexpect, which is the windows alternative of pexpect.
import wexpect
p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.') // regex pattern of any character
output_str = p.after()
Upvotes: 1
Reputation: 3207
Things are a lot better in modern Python.
Here's a simple child program, "hello.py":
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
And a program to interact with it:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
That prints out:
b'hello bob\n'
b'hello alice\n'
Note that the actual pattern, which is also by almost all of the previous answers, both here and in related questions, is to set the child's stdout file descriptor to non-blocking and then poll it in some sort of select loop. These days, of course, that loop is provided by asyncio.
Upvotes: 18
Reputation: 414835
fcntl
, select
, asyncproc
won't help in this case.
A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Upvotes: 498
Reputation: 91
I also faced the problem described by Jesse and solved it by using "select" as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Upvotes: 0
Reputation: 1258
This solution uses the select
module to "read any available data" from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn't block further.
Given the fact that it uses the select
module, this only works on Unix.
The code is fully PEP8-compliant.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Upvotes: 0
Reputation: 473
My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.
I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.
After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I'm using the select
function call.
I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen
and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.
The key was to set bufsize=1
for line buffering and then universal_newlines=True
to process as a text file instead of a binary which seems to become the default when setting bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.
This solution is IMHO 99.99% effective as it still uses the blocking readline
function, so we assume the sub process is nice and outputs complete lines.
I welcome feedback to improve the solution as I am still new to Python.
Upvotes: 1
Reputation: 451
This is a example to run interactive command in subprocess, and the stdout is interactive by using pseudo terminal. You can refer to: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Upvotes: 1
Reputation: 1204
This version of non-blocking read doesn't require special modules and will work out-of-the-box on majority of Linux distros.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
Upvotes: 4
Reputation: 48248
Adding this answer here since it provides ability to set non-blocking pipes on Windows and Unix.
All the ctypes
details are thanks to @techtonik's answer.
There is a slightly modified version to be used both on Unix and Windows systems.
This way you can use the same function and exception for Unix and Windows code.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
To avoid reading incomplete data, I ended up writing my own readline generator (which returns the byte string for each line).
Its a generator so you can for example...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Upvotes: 2
Reputation: 19
Here is a module that supports non-blocking reads and background writes in python:
https://pypi.python.org/pypi/python-nonblock
Provides a function,
nonblock_read which will read data from the stream, if available, otherwise return an empty string (or None if the stream is closed on the other side and all possible data has been read)
You may also consider the python-subprocess2 module,
https://pypi.python.org/pypi/python-subprocess2
which adds to the subprocess module. So on the object returned from "subprocess.Popen" is added an additional method, runInBackground. This starts a thread and returns an object which will automatically be populated as stuff is written to stdout/stderr, without blocking your main thread.
Enjoy!
Upvotes: -2
Reputation: 414835
Python 3.4 introduces new provisional API for asynchronous IO -- asyncio
module.
The approach is similar to twisted
-based answer by @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
There is a high-level interface asyncio.create_subprocess_exec()
that returns Process
objects that allows to read a line asynchroniosly using StreamReader.readline()
coroutine
(with async
/await
Python 3.5+ syntax):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
performs the following tasks:
Each step could be limited by timeout seconds if necessary.
Upvotes: 51
Reputation: 432
In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).
I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.
If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
Upvotes: 2
Reputation: 97
why bothering thread&queue? unlike readline(), BufferedReader.read1() wont block waiting for \r\n, it returns ASAP if there is any output coming in.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
Upvotes: 0
Reputation: 69
EDIT: This implementation still blocks. Use J.F.Sebastian's answer instead.
I tried the top answer, but the additional risk and maintenance of thread code was worrisome.
Looking through the io module (and being limited to 2.6), I found BufferedReader. This is my threadless, non-blocking solution.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
Upvotes: 0
Reputation: 1137
I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline()
blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline()
is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX...
Upvotes: 91
Reputation: 31951
Working from J.F. Sebastian's answer, and several other sources, I've put together a simple subprocess manager. It provides the request non-blocking reading, as well as running several processes in parallel. It doesn't use any OS-specific call (that I'm aware) and thus should work anywhere.
It's available from pypi, so just pip install shelljob
. Refer to the project page for examples and full docs.
Upvotes: 0
Reputation: 301
Here is my code, used to catch every output from subprocess ASAP, including partial lines. It pumps at same time and stdout and stderr in almost correct order.
Tested and correctly worked on Python 2.7 linux & windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Upvotes: 8
Reputation: 6701
You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol
class, and override the outReceived()
method. Twisted (depending upon the reactor used) is usually just a big select()
loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived()
method is simply installing a callback for handling data coming from STDOUT
. A simple example demonstrating this behavior is as follows:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
The Twisted documentation has some good information on this.
If you build your entire application around Twisted, it makes asynchronous communication with other processes, local or remote, really elegant like this. On the other hand, if your program isn't built on top of Twisted, this isn't really going to be that helpful. Hopefully this can be helpful to other readers, even if it isn't applicable for your particular application.
Upvotes: 17
Reputation: 2888
I have created a library based on J. F. Sebastian's solution. You can use it.
https://github.com/cenkalti/what
Upvotes: 0
Reputation: 636
Disclaimer: this works only for tornado
You can do this by setting the fd to be nonblocking and then use ioloop to register callbacks. I have packaged this in an egg called tornado_subprocess and you can install it via PyPI:
easy_install tornado_subprocess
now you can do something like this:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
you can also use it with a RequestHandler
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Upvotes: 7
Reputation: 69
I add this problem to read some subprocess.Popen stdout. Here is my non blocking read solution:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Upvotes: 6
Reputation: 22726
Try the asyncproc module. For example:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
The module takes care of all the threading as suggested by S.Lott.
Upvotes: 19