hunyi
hunyi

Reputation: 63

Python multiprocessing pool some process in deadlock when forked but runs when spawned

So I tried to experiment with some service downloading and resizing images (using threads for downloading the images and processes to resize them). I fire up the download threads (with a manager thread that will watch them) and as soon as an image is saved locally I add its path to a Queue. The manager thread will add a poison pill to the queue when all images are downloaded.

The main thread meanwhile watches the queue and gets the paths from it as they are downloaded and it fires up a new async process from a pool to resize the image.

At the end when I try to join the pool it hangs sometimes, seems to be a deadlock. It does not happen every time but the more url in the IMG_URLS list are the more often it happens. In case of this deadlock happens the logs tell us that some processes were not properly started or are in a deadlock immediately because the "resizing {file}" log does not appear for them.

import logging
import multiprocessing as mp
import time
from queue import Queue
from threading import Thread


def resize_image(file):
    logging.info(f"resizing {file}")
    time.sleep(0.1)
    logging.info(f"done resizing {file}")


class Service(object):
    def __init__(self):
        self.img_queue = Queue()

    def download_image(self, url) -> None:
        logging.info(f"downloading image from URL {url}")
        time.sleep(1)
        file = f"local-{url}"
        self.img_queue.put(file)
        logging.info(f"image saved to {file}")

    def download_images(self, img_url_list: list):
        logging.info("beginning image downloads")

        threads = []
        for url in img_url_list:
            t = Thread(target=self.download_image, args=(url,))
            t.start()
            threads.append(t)

        for t in threads:
            t.join()
        logging.info("all images downloaded")
        self.img_queue.put(None)

    def resize_images(self):
        logging.info("beginning image resizing")
        with mp.Pool() as p:
            while True:
                file = self.img_queue.get()
                if file is None:
                    logging.info("got SENTINEL")
                    break
                logging.info(f"got {file}")
                p.apply_async(func=resize_image, args=(file,))
            p.close()
            p.join()
        logging.info("all images resized")

    def run(self, img_url_list):
        logging.info("START service")

        dl_manager_thread = Thread(target=self.download_images, args=(img_url_list,))
        dl_manager_thread.start()
        self.resize_images()

        logging.info(f"END service")


if __name__ == "__main__":
    FORMAT = "[%(threadName)s, %(asctime)s, %(levelname)s] %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=FORMAT)

    IMG_URLS = list(range(8))

    service = Service()
    service.run(IMG_URLS)

When running this with python 3.8.5 (Ubuntu 20.04, Ryzen 2600). I get the following:

[MainThread, 2020-11-30 19:58:01,257, INFO] START service
[Thread-1, 2020-11-30 19:58:01,257, INFO] beginning image downloads
[MainThread, 2020-11-30 19:58:01,257, INFO] beginning image resizing
[Thread-2, 2020-11-30 19:58:01,258, INFO] downloading image from URL 0
[Thread-3, 2020-11-30 19:58:01,258, INFO] downloading image from URL 1
[Thread-4, 2020-11-30 19:58:01,258, INFO] downloading image from URL 2
[Thread-5, 2020-11-30 19:58:01,259, INFO] downloading image from URL 3
[Thread-6, 2020-11-30 19:58:01,260, INFO] downloading image from URL 4
[Thread-7, 2020-11-30 19:58:01,260, INFO] downloading image from URL 5
[Thread-8, 2020-11-30 19:58:01,261, INFO] downloading image from URL 6
[Thread-9, 2020-11-30 19:58:01,262, INFO] downloading image from URL 7
[Thread-2, 2020-11-30 19:58:02,259, INFO] image saved to local-0
[MainThread, 2020-11-30 19:58:02,260, INFO] got local-0
[Thread-3, 2020-11-30 19:58:02,260, INFO] image saved to local-1
[Thread-4, 2020-11-30 19:58:02,260, INFO] image saved to local-2
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-1
[MainThread, 2020-11-30 19:58:02,261, INFO] resizing local-0
[Thread-5, 2020-11-30 19:58:02,261, INFO] image saved to local-3
[Thread-6, 2020-11-30 19:58:02,261, INFO] image saved to local-4
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-2
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-3
[MainThread, 2020-11-30 19:58:02,262, INFO] resizing local-1
[Thread-7, 2020-11-30 19:58:02,262, INFO] image saved to local-5
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-3
[Thread-8, 2020-11-30 19:58:02,263, INFO] image saved to local-6
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-6
[MainThread, 2020-11-30 19:58:02,264, INFO] resizing local-6
[Thread-9, 2020-11-30 19:58:02,264, INFO] image saved to local-7
[MainThread, 2020-11-30 19:58:02,265, INFO] got local-7
[Thread-1, 2020-11-30 19:58:02,265, INFO] all images downloaded
[MainThread, 2020-11-30 19:58:02,265, INFO] got SENTINEL
[MainThread, 2020-11-30 19:58:02,265, INFO] resizing local-7
[MainThread, 2020-11-30 19:58:02,362, INFO] done resizing local-0
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-1
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-3
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-4
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-5
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-6
[MainThread, 2020-11-30 19:58:02,366, INFO] done resizing local-7

And sometimes here it starts hanging. Notice that resizing local-2 log is missing, so that process didn't start or it waits for something.

If I change the pool to use spawning not forking, it works fine. I guess the fork copies some lock in an acquired state in some cases and that is the issue, but I don't clearly see where and why.

with mp.get_context("spawn").Pool() as p:

Any idea?

Upvotes: 4

Views: 3616

Answers (2)

hunyi
hunyi

Reputation: 63

Just some extra info to extend the great answer from Aaron.

This python bug/enhancement seems to be the exact same thing: https://bugs.python.org/issue6721

which I found in another question asking the same thing: Deadlock with logging multiprocess/multithread python script

Upvotes: 2

Aaron
Aaron

Reputation: 11075

Sometimes (when you're unlucky) as your pool is spinning up, one of the child processes will be "fork"ed while your downloader thread is writing a message to the logging module. The logging module uses a queue protected by a lock in order to pass messages around, so when "fork" happens, that lock can get copied in the locked state. Then when the download thread is done writing its message to the queue, only the lock on the main process gets released, so you're left with a subprocess waiting on the copy of that lock to write a message to logging. That lock can never be released because the downloader thread does not get copied (fork doesn't copy threads). This is the deadlock that occurs. This type of error can be patched in some ways, but is one of the reasons "spawn" exists.

Additionally, "spawn" is the only method supported by all architectures. It is just so easy to use a library that happens to be multithreaded under the hood without realizing it, and "fork" just isn't real multi-threading friendly. I don't have much knowledge of "forkserver" in case you really do need the reduced overhead afforded by "fork". In theory it is a little more multi-threading safe.

fork

The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Here's a more in-depth discussion with a few references on this problem which I used as my primary resource

Upvotes: 6

Related Questions