Pierre D
Pierre D

Reputation: 26321

Python 3 concurrent.futures and per-thread initialization

In Python 3, is it possible to use a subclass of Thread in the context of a concurrent.futures.ThreadPoolExecutor, so that they can be individually initialized before processing (presumably many) work items?

I'd like to use the convenient concurrent.futures API for a piece of code that syncs up files and S3 objects (each work item is one file to sync if the corresponding S3 object is inexistent or out-of-sync). I would like each worker thread to do some initialization first, such as setting up a boto3.session.Session. Then that thread pool of workers would be ready to process potentially thousands of work items (files to sync).

BTW, if a thread dies for some reason, is it reasonable to expect a new thread to be automatically created and added back to the pool?

(Disclaimer: I am much more familiar with Java's multithreading framework than Python's one).

Upvotes: 5

Views: 4600

Answers (1)

Pierre D
Pierre D

Reputation: 26321

So, it seems that a simple solution to my problem is to use threading.local to store a per-thread "session" (in the mockup below, just a random int). Perhaps not the cleanest I guess but for now it will do. Here is a mockup (Python 3.5.1):

import time
import threading
import concurrent.futures
import random
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')

x = [0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]

mydata = threading.local()

def do_work(secs):
    if 'session' in mydata.__dict__:
        logging.debug('re-using session "{}"'.format(mydata.session))
    else:
        mydata.session = random.randint(0,1000)
        logging.debug('created new session: "{}"'.format(mydata.session))
    time.sleep(secs)
    logging.debug('slept for {} seconds'.format(secs))
    return secs

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    y = executor.map(do_work, x)

print(list(y))

Produces the following output, showing that "sessions" are indeed local to each thread and reused:

(Thread-1) 29 - created new session: "855"
(Thread-2) 29 - created new session: "58"
(Thread-3) 30 - created new session: "210"
(Thread-1) 129 - slept for 0.1 seconds
(Thread-1) 130 - re-using session "855"
(Thread-2) 130 - slept for 0.1 seconds
(Thread-2) 130 - re-using session "58"
(Thread-3) 230 - slept for 0.2 seconds
(Thread-3) 230 - re-using session "210"
(Thread-3) 331 - slept for 0.1 seconds
(Thread-3) 331 - re-using session "210"
(Thread-3) 331 - slept for 0.0 seconds
(Thread-1) 530 - slept for 0.4 seconds
(Thread-2) 1131 - slept for 1.0 seconds
[0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]

Minor note about logging: in order to use this in an IPython notebook, the logging setup needs to be slightly modified (since IPython has already setup a root logger). A more robust logging setup would be:

IN_IPYNB = 'get_ipython' in vars()

if IN_IPYNB:
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    for h in logger.handlers:
        h.setFormatter(logging.Formatter(
                '(%(threadName)-0s) %(relativeCreated)d - %(message)s'))
else:
    logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')

Upvotes: 4

Related Questions