tombird
tombird

Reputation: 95

Avoiding loading spaCy data in each subprocess when multiprocessing

I would like to use spaCy in a program which is currently implemented with multiprocessing. Specifically I am using ProcessingPool to spawn 4 subprocesses which then go off and do their merry tasks.

To use spaCy (specifically for POS tagging), I need to invoke spacy.load('en'), which is an expensive call (takes ~10 seconds). If I am to load this object within each subprocess then it will take ~40 seconds, as they are all reading from the same location. This is annoyingly long.

But I cannot figure out a way to get them to share the object which is being loaded. This object cannot be pickled, which means (as far as I know):

  1. It cannot be passed into the Pool.map call
  2. It cannot be stored and used by a Manager instance to then be shared amongst the processes

What can I do?

Upvotes: 3

Views: 1040

Answers (1)

amirouche
amirouche

Reputation: 7883

I don't how you use Pool.map exactly but be aware that Pool.map doesn't work with a massive number of input. In Python 3.6, it's implemented in Lib/multiprocessing/pool.py as you can see, it states it takes an iterable as first argument but the implementation does consume the whole iterable before running the multiprocess map. So I think that's not Pool.map that you need to use if you need to process a lot of data. Maybe Pool.imap and Pool.imap_unordered can work.

About your actual issue. I have a solution that doesn't involve Pool.map and works kind of a like multiprocess foreach.

First you need to inherit Pool and create a worker process:

from multiprocessing import cpu_count
from multiprocessing import Queue
from multiprocessing import Process


class Worker(Process):

    english = spacy.load('en')

    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        for args in iter(self.queue.get, None):
            # process args here, you can use self.

You prepare the pool of processus like that:

queue = Queue()
workers = list()
for _ in range(cpu_count()):  # minus one if the main processus is CPU intensive
    worker = Worker(queue)
    workers.append(worker)
    worker.start()

Then you can feed the pool via queue:

for args in iterable:
    queue.put(args)

iterable is the list of arguments that you pass to the workers. The above code will push the content of iterable as fast as it can. Basically, if the worker is slow enough, almost all the iterable will be pushed to the queue before the workers have finished their job. That's why the content of the iterable must fit into memory.

If the workers arguments (aka. iterable) can't fit into memory you must synchronize somehow the main processus and the workers...

At the end make sure to call the following:

for worker in workers:
    queue.put(None)

for worker in workers:
    worker.join()

Upvotes: 2

Related Questions