Dacav
Dacav

Reputation: 14078

Flushing a pipe (os.pipe) before closing

I need to launch a subprocess and enable two threads for reading its stdout and stderr respectively.

The following code is just considering stdout:

def reader(rfd):
    while True:
        try:
            data = os.read(rfd, bufsize)
        except OSError:
            break
        else:
            chomp(data)

rout, wout = os.pipe()
tout = threading.Thread(target=reader, args=(rout,))
tout.start()

subprocess.check_call(command, bufsize=bufsize, stdout=wout, stderr=werr)

os.close(wout)
os.close(rout)
tout.join()

The code works, except I noticed that not all data is processed, as if the os.close(wout) function kills the reader before all data is read. On the other hand, if I don't close wout my process will be hanging forever on tout.join().

I can tell this is a buffering problem because if I put a very bad time.sleep(0.1) just after subprocess.check_call(...) everything magically works.

The good way would be flushing instead of waiting, but any call to os.fsync() over a pipe is giving OSError: [Errno 22] Invalid argument.

Any hint about how to flush a pipe created with os.pipe?

Upvotes: 2

Views: 6455

Answers (2)

Dunes
Dunes

Reputation: 40843

I would recommend using Popen rather than os.pipe for interprocess communication.

eg.

writer_process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
reader_thread = threading.Thread(target=reader, args=(writer_process.stdout,))
reader_thread.start()
reader_thread.join()

However, if you really want to use os.pipe then you will have an easier time treating them like file objects. Python's in-built file context manager will make sure the files and flushed and closed appropriately.

eg.

def reader(fd):
    with os.fdopen(fd, bufsize=bufsize) as f:
        while True:
            data = f.read(bufsize)
            if not data:
                break
            chomp(data)

and

with os.fdopen(wout, "w", bufsize=bufsize) as f:
    subprocess.check_call(cmd, stdout=f)

Upvotes: 2

loopbackbee
loopbackbee

Reputation: 23332

You can't close the pipes until you're sure the process is done writing (because you'll lose data), and you can't wait for the thread to finish without closing the pipes (because os.read will block forever).

You need to wait for process completion, and close the writing end of the pipe manually (since you created it).

Here's a self contained example:

import os
import threading
import subprocess

def reader(rfd):
    while True:
        try:
            data = os.read(rfd, bufsize)
            if not data:
                break #reached EOF
        except OSError:
            break

bufsize=100

rout, wout = os.pipe()
rerr, werr = os.pipe()

tout = threading.Thread(target=reader, args=(rout,))
tout.start()

p= subprocess.Popen("ls", bufsize=bufsize, stdout=wout, stderr=werr)
p.wait() #wait for the proces to end writing
os.close(wout)
os.close(werr)
tout.join()
os.close(rout)
os.close(rerr)

Upvotes: 1

Related Questions