Alan.G
Alan.G

Reputation: 113

Timeouting subprocesses in Python

def adbshell(command, serial=None, adbpath='adb'):
    args = [adbpath]
    if serial is not None:
        args.extend(['-s', serial])
    args.extend(['shell', command])
    return subprocess.check_output(args)



def pmpath(serial=None, adbpath='adb'):
    return adbshell('am instrument -e class............', serial=serial, adbpath=adbpath)

I have to run this test for a specific time period, and then exit if it is not working. How do I provide a timeout?

Upvotes: 1

Views: 588

Answers (2)

nu11p01n73R
nu11p01n73R

Reputation: 26667

The Popen constructure provides more flexiblity, as it can be used to check the exit status of the subprocess call.

The Popen.poll returns None if the process has not terminated yet. Hence call the subrprocess, sleep for the time required time out.

consider a simple test.py which is the subprocess called from the main program.

import time

for i in range(10):
        print i
        time.sleep(2)

The test.py is called from another program using the subprocess.Popen

from subprocess import Popen, PIPE
import time

cmd = Popen(['python','test.py'],stdout=PIPE)
print cmd.poll()
time.sleep(2)
if  cmd.poll()== None:
       print "killing"
       cmd.terminate()

time.sleep(2)

provides a time out of 2 seconds, so that the program can excecute. checks the exit status of the process using Popen.poll

if None, the process has not terminated, kills the process.

Upvotes: 1

sk11
sk11

Reputation: 1824

Depending which Python version you are running.

Python 3.3 onwards:

subprocess.check_output() provides a timeout param. Check the signature here

subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)

Below Python 3.3:

You can use threading module. Something like:

def run(args, timeout):
    def target():
        print 'Start thread'
        subprocess.check_output(args)
        print 'End thread'

    thread = threading.Thread(target=target)
    thread.start() # Start executing the target()

    thread.join(timeout) # Join the thread after specified timeout

Note - I haven't tested the code above with threading and check_output(). Normally I use the subprocess.Popen() which offers more flexibility and handles almost all scenarios. Check the doc

Upvotes: 1

Related Questions