init6
init6

Reputation: 49

asynchronous subprocess Popen python 3.5

I am trying to asynchronously run the Popen command from subprocess, so that I can run other stuff in the background.

import subprocess
import requests
import asyncio
import asyncio.subprocess    

    async def x(message):
        if len(message.content.split()) > 1:
            #output = asyncio.create_subprocess_shell(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
            output = subprocess.Popen(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
            return output.communicate()[0].decode('utf-8')

I have tried to understand https://docs.python.org/3/library/asyncio-subprocess.html but i am not sure what a protocol factory is.

Upvotes: 3

Views: 13109

Answers (5)

T.SURESH ARUNACHALAM
T.SURESH ARUNACHALAM

Reputation: 285

It's the right way to go...! Use

async/await

Tested it on Python - 3.X [Windows, MacOS]

import asyncio
from asyncio.subprocess import PIPE, STDOUT  
import subprocess
import signal

def signal_handler(signal, frame):
    loop.stop()
    client.close()
    sys.exit(0)

async def run_async(loop = ''):
    cmd = 'sudo long_running_cmd --opt1=AAAA --opt2=BBBB'

    print ("[INFO] Starting script...")
    await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout = PIPE, stderr = STDOUT)
    print("[INFO] Script is complete.")


loop = asyncio.get_event_loop()
signal.signal(signal.SIGINT, signal_handler)
tasks = [loop.create_task(run_async())]
wait_tasks = asyncio.wait(tasks)
loop.run_until_complete(wait_tasks)

loop.close()

Core logic:

process = await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout PIPE, stderr = STDOUT)
await process.wait()

Upvotes: 3

Roman Susi
Roman Susi

Reputation: 4199

When I came to this question, I expected the answer to really use asyncio for interprocess communication.

I have found the following resource useful: https://github.com/python/asyncio/blob/master/examples/child_process.py

and below is my simplified example (using 3.5+ async/await syntax), which reads lines and outputs them sorted:

import asyncio

from subprocess import Popen, PIPE


async def connect_write_pipe(file):
    """Return a write-only transport wrapping a writable pipe"""
    loop = asyncio.get_event_loop()
    transport, _ = await loop.connect_write_pipe(asyncio.Protocol, file)
    return transport


async def connect_read_pipe(file):
    """Wrap a readable pipe in a stream"""
    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader(loop=loop)

    def factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(factory, file)
    return stream_reader, transport


async def main(loop):
    # start subprocess and wrap stdin, stdout, stderr
    p = Popen(['/usr/bin/sort'], stdin=PIPE, stdout=PIPE, stderr=PIPE)

    stdin = await connect_write_pipe(p.stdin)
    stdout, stdout_transport = await connect_read_pipe(p.stdout)
    stderr, stderr_transport = await connect_read_pipe(p.stderr)

    # interact with subprocess
    name = {stdout: 'OUT', stderr: 'ERR'}
    registered = {
        asyncio.Task(stderr.read()): stderr,
        asyncio.Task(stdout.read()): stdout
    }

    to_sort = b"one\ntwo\nthree\n"
    stdin.write(to_sort)
    stdin.close()  # this way we tell we do not have anything else

    # get and print lines from stdout, stderr
    timeout = None
    while registered:
        done, pending = await asyncio.wait(
            registered, timeout=timeout,
            return_when=asyncio.FIRST_COMPLETED)
        if not done:
            break
        for f in done:
            stream = registered.pop(f)
            res = f.result()
            if res != b'':
                print(name[stream], res.decode('ascii').rstrip())
                registered[asyncio.Task(stream.read())] = stream
        timeout = 0.0

    stdout_transport.close()
    stderr_transport.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

NB: without taking special measures, the amount of data to be written into the pipe is limited. In my system it was possible to write just over 700000 bytes before using up pipe buffers.

There are also other examples there, using create_subprocess_shell.

I have not yet used asyncio in real projects, so improvements' suggestions in the comments are welcome.

Upvotes: 2

init6
init6

Reputation: 49

I eventually found the answer to my question, which utilizes async. http://pastebin.com/Zj8SK1CG

Upvotes: 0

Riccardo Petraglia
Riccardo Petraglia

Reputation: 2013

This one is much simpler, I found it after the other reply that could, anyway, be interesting... so I left it.

import time
import subprocess
import shlex
from sys import stdout


command = 'time sleep 5' # Here I used the 'time' only to have some output

def x(command):
    cmd = shlex.split(command)
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return p

# Start the subprocess and do something else...
p = x(command)
# ...for example count the seconds in the mean time..

try: # This take care of killing the subprocess if problems occur
    for j in range(100):
        stdout.write('\r{:}'.format(j))
        stdout.flush()
        time.sleep(1)
        if p.poll() != None:
            print(p.communicate())
            break
except:
    p.terminate() # or p.kill()

The asynchronism is evident from the fact that the python script prints the counter value on the stdout while the background process runs the sleep command. The fact that the python script exit after ~5sec printing the output of the bash time command printing the counter in the meanwhile is an evidence that the script works.

Upvotes: -1

Riccardo Petraglia
Riccardo Petraglia

Reputation: 2013

Tested with python 3.5. Just ask if you have questions.

import threading
import time
import subprocess
import shlex
from sys import stdout


# Only data wihtin a class are actually shared by the threads.
# Let's use a class as communicator (there could be problems if you have more than
# a single thread)
class Communicator(object):
    counter = 0
    stop = False
    arg = None
    result = None

# Here we can define what you want to do. There are other methods to do that
# but this is the one I prefer.
class ThreadedFunction(threading.Thread):

    def run(self, *args, **kwargs):
        super().run()
        command = c.arg

        # Here what you want to do...
        command = shlex.split(command)
        print(time.time()) # this is just to check that the command (sleep 5) is executed
        output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
        print('\n',time.time())
        c.result = output
        if c.stop: return None # This is useful only within loops within threads

# Create a class instance
c = Communicator()
c.arg = 'time sleep 5' # Here I used the 'time' only to have some output

# Create the thread and start it
t = ThreadedFunction()
t.start() # Start the thread and do something else...

# ...for example count the seconds in the mean time..
try:
    for j in range(100):
        c.counter += 1
        stdout.write('\r{:}'.format(c.counter))
        stdout.flush()
        time.sleep(1)
        if c.result != None:
            print(c.result)
            break
except:
    c.stop = True

Upvotes: 0

Related Questions