Hari Kallae
Hari Kallae

Reputation: 138

Python 3.4 concurrent.futures.Executor doesn't give the control to pause and resume the threads

I am wokring with concurrent.future.ThredPoolExecutor for multi threading, i am executing few http services, i wanted the control over the threads to pause the execution when the server goes down, start the server and then resume the execution.

The trigger for the server going down is, i am checking if a file is available at a particular location, then i will have to pause the execution.

so concurrent.futures.Executor.shutdown() will Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.

but when i use shutdown() method of executor, it is not shutting down the thread immediately but its calling the shutdown() after finishing the entire execution.

Infact i am calling shutdown() method as i couldn't find pause and resume in concurren.future. So as an alternative i am removing the urls from the list once the thread finishes execution. so that i can pass the remaining list and recall the same method.

Here is the code:

import concurrent.futures
import urllib.request
import os.path
import datetime
import sys
import pathlib
from errno import ENOENT, EACCES, EPERM
import time
import threading

listOfFilesFromDirectory =  []
webroot = settings.configuration.WEBSERVER_WEBROOT
WEBSERVER_PORT = settings.configuration.WEBSERVER_PORT
shutdown = False

def class myclass:

#populating the list with the urls from a file
def triggerMethod(path):
    try:
        for line in open(path):
            listOfFilesFromDirectory.append(line)
    except IOError as err:
        if err.errno == ENOENT:
            #logging.critical("document.txt file is missing")
            print("document.txt file is missing")
        elif err.errno in (EACCES, EPERM):
            #logging.critical("You are not allowed to read document.txt")
            print("You are not allowed to read document.txt")
        else:
            raise   

# calling this method to stop the threads and restart after a sleep of 100 secs, as the list will always have the urls that were not executed.
def stopExecutor(executor):
    filePath = "C:\logs\serverStopLog.txt"
    while not shutdown:
        time.sleep(5)
        if os.path.isfile(filePath):
            executor.shutdown( )
            time.sleep(100)
            runRegressionInMultipleThreads( )
            break

def load_url(url, timeout):
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + "/" + url, timeout = timeout)
    return conn.info()

def trigegerFunc( ):
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60): url for url in listOfFilesFromDirectory}

        t = threading.Thread(target=stopExecutor, args=(executor))
        t.start()
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
                listOfFilesFromDirectory.remove(url)
            else:
                if data:
                    if "200" in data:
                        listOfFilesFromDirectory.remove(url)
                    else:
                        listOfFilesFromDirectory.remove(url)
                else:
                    listOfFilesFromDirectory.remove(url)
        shutdown = True
        t.join()                


triggerMethod("C:\inetpub\wwwroot")
trigegerFunc()

Upvotes: 2

Views: 3153

Answers (1)

dano
dano

Reputation: 94891

You can't cancel or pause/resume threads in Python. executor.shutdown() does exactly what you said it does when you quoted the documentation:

Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.

Note that bolded part - the executor will only shutdown once all currently executing tasks are completed. To get the kind of control you want, you'll need to run the urllib call in a separate process, like this (this is a simplified version of your script):

import time
import os.path
import threading
import urllib.request
import multiprocessing
import concurrent.futures
from multiprocessing import cpu_count

shutdown = False
should_cancel = False

def stopTasks():
    global should_cancel
    filePath = "C:\logs\serverStopLog.txt"
    while not shutdown:
        time.sleep(5)
        if os.path.isfile(filePath):
            should_cancel = True
            break

def _load_url(num, timeout, q):
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + 
                                  "/" + url, timeout=timeout)
    q.put(conn.info())

def load_url(num, timeout):
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=_load_url, args=(num, timeout, q))
    p.start()
    while p.is_alive():
        time.sleep(.5)
        if should_cancel:
            p.terminate()  # This will actually kill the process, cancelling the operation
            break # You could return something here that indicates it was cancelled, too.
    else:
        # We'll only enter this if we didn't `break` above.
        out = q.get()
        p.join()
        return out

def triggerFunc():
    global shutdown
    with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60):
                             url for url in listOfFilesFromDirectory}
        t = threading.Thread(target=stopTasks)
        t.start()
        for future in concurrent.futures.as_completed(future_to_url):
            info = future.result()
            print("done: {}".format(info))
            # other stuff you do
        shutdown = True
        t.join()

if __name__ == "__main__":
    triggerFunc()

Because we can actually kill a sub-process by sending it a SIGTERM, we can truly cancel the urlopen operation while its still in progress.

Upvotes: 1

Related Questions