Colonder
Colonder

Reputation: 1576

Share a dictionary storing objects between several processes in Python

I'm working on a large script that's the main purpose is to read contents of many files and store number of each element in the dictionary. If the element is absent in the dictionary then we're creating a new instance of some object and then increment, only increment otherwise. Since each one of the files to process is huge itself and sometimes I need to process 100+ of them I wanted to speed things up a little and take advantage of Python's multiprocessing module. Here is the largely simplified version of the script (I hid the path with ..., it's not the real one):

import multiprocessing as mp
from os import listdir
from os.path import join

manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()

class TestClass:
    def __init__(self):
        self._number = 0

    def increment(self):
        self._number += 1

def worker(file):
    f = open(file, 'r')
    for line in f.readlines():
        if line not in dictionary:
            dictionary[line] = TestClass()

        dictionary[line].increment()

def _list_files():
    for f in listdir("..."):
        queue.put(join("...", f))

def pool():
    _list_files()
    _pool = mp.Pool(mp.cpu_count())    

    for i in range(len(queue)):
        _pool.apply(worker, args=(queue.get()))

    _pool.close()
    _pool.join()

pool()
print(dictionary)

The problem is that the script crashes with message:

AttributeError: Can't get attribute 'TestClass' on <module '__main__' from '.../multiprocessing_test.py'>  

Is there any way that I can get this to work?
I'm not the one who created the initial version of the script, I'm just adding some functionalities to it. Given that, the structure of the script must stay the same because rewriting it would take too much time, that is TestClass, worker and list_files can't change their structure (except all things connected with multiprocessing)

Upvotes: 1

Views: 470

Answers (1)

torek
torek

Reputation: 488183

(It seems like you posted this question before.)

Your example code is nonfunctional for a bunch of reasons, not least of which is that ... just does not do anything useful:

$ python tst.py
Traceback (most recent call last):
  File "tst.py", line 38, in <module>
    pool()
  File "tst.py", line 29, in pool
    _list_files()
  File "tst.py", line 25, in _list_files
    for f in listdir("..."):
OSError: [Errno 2] No such file or directory: '...'

(It's not good form to post code that won't run, but it is a good idea to provide an MCVE.) So I fixed that:

index 39014ff..1ac9f4a 100644
--- a/tst.py
+++ b/tst.py
@@ -2,6 +2,8 @@ import multiprocessing as mp
 from os import listdir
 from os.path import join

+DIRPATH = 'inputs'
+
 manager = mp.Manager()
 queue = manager.Queue()
 dictionary = manager.dict()
@@ -22,8 +24,8 @@ def worker(file):
         dictionary[line].increment()

 def _list_files():
-    for f in listdir("..."):
-        queue.put(join("...", f))
+    for f in listdir(DIRPATH):
+        queue.put(join(DIRPATH, f))

 def pool():
     _list_files()

and created an inputs/ directory with one sample input file:

$ ls inputs
one
$ cat inputs/one
1
one
unum

and now this example produces:

$ python tst.py
Traceback (most recent call last):
  File "tst.py", line 40, in <module>
    pool()
  File "tst.py", line 34, in pool
    for i in range(len(queue)):
TypeError: object of type 'AutoProxy[Queue]' has no len()

Now, I won't claim that this rewrite is good, but I went ahead and rewrote this into something that does work:

import multiprocessing as mp
from os import listdir
from os.path import join

DIRPATH = 'inputs'

class TestClass:
    def __repr__(self):
        return str(self._number)

    def __init__(self):
        self._number = 0

    def increment(self):
        self._number += 1

def worker(dictionary, queue):
    while True:
        path = queue.get()
        if path is None:
            return
        f = open(path, 'r')
        for line in f.readlines():
            if line not in dictionary:
                dictionary[line] = TestClass()
            dictionary[line].increment()

def run_pool():
    manager = mp.Manager()
    queue = manager.Queue()
    dictionary = manager.dict()
    nworkers = mp.cpu_count()
    pool = mp.Pool(nworkers)

    for i in range(nworkers):
        pool.apply_async(worker, args=(dictionary, queue))

    for f in listdir(DIRPATH):
        queue.put(join(DIRPATH, f))
    for i in range(nworkers):
        queue.put(None)

    pool.close()
    pool.join()

    return dictionary

def main():
    dictionary = run_pool()
    print(dictionary)

if __name__ == '__main__':
    main()

The main differences are:

  • I removed all the global variables. The manager instance, the managed queue, and the managed dictionary are all local to run_pool.

  • I put the names of the files into the queue after creating nworker workers. Each worker runs a loop, reading file names, until it reads a None name, then returns its (None) result.

  • The main loop drops the file names into the queue, so that workers can pull file names out of the queue as they finish each previous file. To signal all nworkers workers to exit, the main loop adds that many None entries to the queue.

  • run_pool returns the final (still managed) dictionary.

And of course I added a __repr__ to your TestClass object so that we can see the counts. I also made sure that the code should work on Windows by moving the main driver into a function, called only if __name__ == '__main__'.

Upvotes: 2

Related Questions