Naftuli Kay
Naftuli Kay

Reputation: 91630

Solving process chaining in Python

I'm trying to write a utility which will seamless pipe an unbounded amount of commands together in Python. This is what I've come up with so far, following the documentation on piping in subprocess:

from subprocess import Popen, PIPE, STDOUT

def taskchain(argset, result=STDOUT):
    lastprocess = None

    for index, args in enumerate(argset): # expected to be a list containing lists
        process = Popen(args, stdout=stdout if index == (len(argset)-1) else PIPE,
            stdin=None if lastprocess is None else lastprocess.stdout)

        if lastprocess is not None:
            lastprocess.stdout.close()

        lastprocess = process

    if stdout == PIPE:
        return lastprocess.communicate()[0]
    else:
        lastprocess.wait()

Note that I'm not using shell=True in order to hopefully avoid security concerns there.

Unfortunately, this doesn't work, as I'm getting:

OSError: [Errno 9] Baf file descriptor

I'm not sure what seems to be failing. Can someone help me write a method to implement process chaining for an unbounded amount of subprocesses?

(Use case is like this: taskchain([('ps', 'aux'), ('grep', 'python'), ('cut', '-b', '1')]).)

Upvotes: 3

Views: 174

Answers (2)

ecatmur
ecatmur

Reputation: 157354

stdout=stdout if argset.index(args) == (len(args) - 1) else PIPE)

This should probably be

stdout=stdout if argset.index(args) == (len(argset) - 1) else PIPE)

And you'd be better off using enumerate rather than argset.index.

Also, you need to connect the subprocesses to each other:

..., stdin=None if lastprocess is None else lastprocess.stdout)

Finally, STDOUT is only valid as an argument for the stderr parameter; to pass stdout through you should pass stdout=None.

Upvotes: 2

Naftuli Kay
Naftuli Kay

Reputation: 91630

With @ecatmur's help above, here's the final solution:

def taskchain(*args, **kwargs):
    output = kwargs.get('output', PIPE)
    error_output = kwargs.get('error_output', STDOUT) 
    lastprocess = None

    for index, argset in enumerate(args):
        islastprocess = index == len(args) - 1

        process = Popen(argset, stdout=output if islastprocess else PIPE,
                stdin=None if lastprocess is None else lastprocess.stdout,
                stderr=error_output)

        if lastprocess is not None:
            lastprocess.stdout.close()

        lastprocess = process

    if output == PIPE:
        return lastprocess.communicate()
    else:
        lastprocess.wait()

Processes are chained together, effectively emulating shell-based piping without the security risks!

Examples:

>>> print taskchain(('ps','aux'), ('grep','python'), ('cut','-c','1-50'))[0].trim()
naftuli   3221  0.1  0.1  60960 12376 pts/3    S+ 
naftuli   3242  0.0  0.0  32592  5748 pts/2    S+ 
naftuli   3246  0.0  0.0   9388   916 pts/2    S+ 
naftuli   5852  0.0  0.2 430284 20200 ?        Sl 
root      8153  0.0  0.1  95520 11060 ?        S

>>> print taskchain(('ls',), ('grep', 'ic'))[0]
Music
Pictures
Public

Upvotes: 0

Related Questions