Reputation: 95
I would like to use spaCy in a program which is currently implemented with multiprocessing. Specifically I am using ProcessingPool
to spawn 4 subprocesses which then go off and do their merry tasks.
To use spaCy (specifically for POS tagging), I need to invoke spacy.load('en')
, which is an expensive call (takes ~10 seconds). If I am to load this object within each subprocess then it will take ~40 seconds, as they are all reading from the same location. This is annoyingly long.
But I cannot figure out a way to get them to share the object which is being loaded. This object cannot be pickled, which means (as far as I know):
Pool.map
callManager
instance to then be shared amongst the processesWhat can I do?
Upvotes: 3
Views: 1040
Reputation: 7883
I don't how you use Pool.map
exactly but be aware that Pool.map
doesn't work with a massive number of input. In Python 3.6, it's implemented in Lib/multiprocessing/pool.py as you can see, it states it takes an iterable
as first argument but the implementation does consume the whole iterable before running the multiprocess map. So I think that's not Pool.map
that you need to use if you need to process a lot of data. Maybe Pool.imap
and Pool.imap_unordered
can work.
About your actual issue. I have a solution that doesn't involve Pool.map
and works kind of a like multiprocess foreach
.
First you need to inherit Pool
and create a worker process:
from multiprocessing import cpu_count
from multiprocessing import Queue
from multiprocessing import Process
class Worker(Process):
english = spacy.load('en')
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
def run(self):
for args in iter(self.queue.get, None):
# process args here, you can use self.
You prepare the pool of processus like that:
queue = Queue()
workers = list()
for _ in range(cpu_count()): # minus one if the main processus is CPU intensive
worker = Worker(queue)
workers.append(worker)
worker.start()
Then you can feed the pool via queue
:
for args in iterable:
queue.put(args)
iterable
is the list of arguments that you pass to the workers. The above code will push the content of iterable
as fast as it can. Basically, if the worker is slow enough, almost all the iterable will be pushed to the queue before the workers have finished their job. That's why the content of the iterable must fit into memory.
If the workers arguments (aka. iterable
) can't fit into memory you must synchronize somehow the main processus and the workers...
At the end make sure to call the following:
for worker in workers:
queue.put(None)
for worker in workers:
worker.join()
Upvotes: 2