Reputation: 14078
I need to launch a subprocess and enable two threads for reading its stdout
and stderr
respectively.
The following code is just considering stdout
:
def reader(rfd):
while True:
try:
data = os.read(rfd, bufsize)
except OSError:
break
else:
chomp(data)
rout, wout = os.pipe()
tout = threading.Thread(target=reader, args=(rout,))
tout.start()
subprocess.check_call(command, bufsize=bufsize, stdout=wout, stderr=werr)
os.close(wout)
os.close(rout)
tout.join()
The code works, except I noticed that not all data is processed, as if the os.close(wout)
function kills the reader before all data is read. On the other hand, if I don't close wout
my process will be hanging forever on tout.join()
.
I can tell this is a buffering problem because if I put a very bad time.sleep(0.1)
just after subprocess.check_call(...)
everything magically works.
The good way would be flushing instead of waiting, but any call to os.fsync()
over a pipe is giving OSError: [Errno 22] Invalid argument
.
Any hint about how to flush a pipe created with os.pipe
?
Upvotes: 2
Views: 6455
Reputation: 40843
I would recommend using Popen
rather than os.pipe
for interprocess communication.
eg.
writer_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
reader_thread = threading.Thread(target=reader, args=(writer_process.stdout,))
reader_thread.start()
reader_thread.join()
However, if you really want to use os.pipe
then you will have an easier time treating them like file objects. Python's in-built file context manager will make sure the files and flushed and closed appropriately.
eg.
def reader(fd):
with os.fdopen(fd, bufsize=bufsize) as f:
while True:
data = f.read(bufsize)
if not data:
break
chomp(data)
and
with os.fdopen(wout, "w", bufsize=bufsize) as f:
subprocess.check_call(cmd, stdout=f)
Upvotes: 2
Reputation: 23332
You can't close the pipes until you're sure the process is done writing (because you'll lose data), and you can't wait for the thread to finish without closing the pipes (because os.read
will block forever).
You need to wait for process completion, and close the writing end of the pipe manually (since you created it).
Here's a self contained example:
import os
import threading
import subprocess
def reader(rfd):
while True:
try:
data = os.read(rfd, bufsize)
if not data:
break #reached EOF
except OSError:
break
bufsize=100
rout, wout = os.pipe()
rerr, werr = os.pipe()
tout = threading.Thread(target=reader, args=(rout,))
tout.start()
p= subprocess.Popen("ls", bufsize=bufsize, stdout=wout, stderr=werr)
p.wait() #wait for the proces to end writing
os.close(wout)
os.close(werr)
tout.join()
os.close(rout)
os.close(rerr)
Upvotes: 1