Andy Hayden
Andy Hayden

Reputation: 375475

Capture output as a tty in python

I have a executable which requires a tty (as stdin and stderr), and want to be able to test it. I want to input stdin, and capture the output of stdout and stderr, here's an example script:

# test.py
import sys
print("stdin: {}".format(sys.stdin.isatty()))
print("stdout: {}".format(sys.stdout.isatty()))
print("stderr: {}".format(sys.stderr.isatty()))
sys.stdout.flush()
line = sys.stdin.readline()
sys.stderr.write("read from stdin: {}".format(line))
sys.stderr.flush()

I can run this without tty, but that gets caught by .isatty and each return False:

import subprocess
p = subprocess.Popen(["python", "test.py"], stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
p.stdin.write(b"abc\n")
print(p.communicate())
# (b'stdin: False\nstdout: False\nstderr: False\n', b'read from stdin: abc\n')

I want to capture the stdout and stderr and have all three return True - as a tty.

I can use pty to make a tty stdin:

import subprocess
m, s = pty.openpty()
p = subprocess.Popen(["python", "test.py"], stdin=s, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdin = os.fdopen(m, 'wb', 0)
os.close(s)
stdin.write(b"abc\n")
(stdout, stderr) = p.communicate()
stdin.close()
print((stdout, stderr))
# (b'stdin: True\nstdout: False\nstderr: False\n', b'read from stdin: abc\n')

I've tried a bunch of permutations to make stdout and stderr tty to no avail.
The output I want here is:

(b'stdin: True\nstdout: True\nstderr: True\n', b'read from stdin: abc\n')

Upvotes: 10

Views: 5367

Answers (2)

casper.dcl
casper.dcl

Reputation: 14779

Just wanted to add a readline iterator (plus graceful shutdown) version of @unutbu's answer:

import errno
import os
import pty
import signal
import subprocess


def subprocess_tty(cmd, encoding="utf-8", timeout=10, **kwargs):
    """`subprocess.Popen` yielding stdout lines acting as a TTY"""
    m, s = pty.openpty()
    p = subprocess.Popen(cmd, stdout=s, stderr=s, **kwargs)
    os.close(s)

    try:
        for line in open(m, encoding=encoding):
            if not line:  # EOF
                break
            yield line
    except OSError as e:
        if errno.EIO != e.errno:  # EIO also means EOF
            raise
    finally:
        if p.poll() is None:
            p.send_signal(signal.SIGINT)
            try:
                p.wait(timeout)
            except subprocess.TimeoutExpired:
                p.terminate()
                try:
                    p.wait(timeout)
                except subprocess.TimeoutExpired:
                    p.kill()
        p.wait()

Example usage:

import textwrap

for line in subprocess_tty(
    [
        "python",
        "-c",
        textwrap.dedent(
            """\
            import sys
            print(sys.stdin.isatty())
            print(sys.stdout.isatty())
            print(sys.stderr.isatty())
            """
        ),
    ]
):
    print(f"{line!r}")

Upvotes: 1

unutbu
unutbu

Reputation: 879451

The code below is based on jfs' answers here and here, plus your idea of using 3 pseudo-terminals to distinguish stdout, stderr and stdin (though note there is a cryptic warning that something may go wrong (such as a possibly truncated stderr on OSX?) by doing so).

Also note that, as of Python 3.10, the docs say pty is tested on Linux, macOS, and FreeBSD, though it is "supposed to work" for other POSIX platforms:

import errno
import os
import pty
import select
import subprocess

def tty_capture(cmd, bytes_input):
    """Capture the output of cmd with bytes_input to stdin,
    with stdin, stdout and stderr as TTYs.

    Based on Andy Hayden's gist:
    https://gist.github.com/hayd/4f46a68fc697ba8888a7b517a414583e
    """
    mo, so = pty.openpty()  # provide tty to enable line-buffering
    me, se = pty.openpty()  
    mi, si = pty.openpty()  

    p = subprocess.Popen(
        cmd,
        bufsize=1, stdin=si, stdout=so, stderr=se, 
        close_fds=True)
    for fd in [so, se, si]:
        os.close(fd)
    os.write(mi, bytes_input)

    timeout = 0.04  # seconds
    readable = [mo, me]
    result = {mo: b'', me: b''}
    try:
        while readable:
            ready, _, _ = select.select(readable, [], [], timeout)
            for fd in ready:
                try:
                    data = os.read(fd, 512)
                except OSError as e:
                    if e.errno != errno.EIO:
                        raise
                    # EIO means EOF on some systems
                    readable.remove(fd)
                else:
                    if not data: # EOF
                        readable.remove(fd)
                    result[fd] += data

    finally:
        for fd in [mo, me, mi]:
            os.close(fd)
        if p.poll() is None:
            p.kill()
        p.wait()

    return result[mo], result[me]

out, err = tty_capture(["python", "test.py"], b"abc\n")
print((out, err))

yields

(b'stdin: True\r\nstdout: True\r\nstderr: True\r\n', b'read from stdin: abc\r\n')

Upvotes: 5

Related Questions