Reputation: 1576
I'm working on a large script that's the main purpose is to read contents of many files and store number of each element in the dictionary. If the element is absent in the dictionary then we're creating a new instance of some object and then increment, only increment otherwise. Since each one of the files to process is huge itself and sometimes I need to process 100+ of them I wanted to speed things up a little and take advantage of Python's multiprocessing module. Here is the largely simplified version of the script (I hid the path with ..., it's not the real one):
import multiprocessing as mp
from os import listdir
from os.path import join
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
class TestClass:
def __init__(self):
self._number = 0
def increment(self):
self._number += 1
def worker(file):
f = open(file, 'r')
for line in f.readlines():
if line not in dictionary:
dictionary[line] = TestClass()
dictionary[line].increment()
def _list_files():
for f in listdir("..."):
queue.put(join("...", f))
def pool():
_list_files()
_pool = mp.Pool(mp.cpu_count())
for i in range(len(queue)):
_pool.apply(worker, args=(queue.get()))
_pool.close()
_pool.join()
pool()
print(dictionary)
The problem is that the script crashes with message:
AttributeError: Can't get attribute 'TestClass' on <module '__main__' from '.../multiprocessing_test.py'>
Is there any way that I can get this to work?
I'm not the one who created the initial version of the script, I'm just adding some functionalities to it. Given that, the structure of the script must stay the same because rewriting it would take too much time, that is TestClass
, worker
and list_files
can't change their structure (except all things connected with multiprocessing)
Upvotes: 1
Views: 470
Reputation: 488183
(It seems like you posted this question before.)
Your example code is nonfunctional for a bunch of reasons, not least of which is that ...
just does not do anything useful:
$ python tst.py
Traceback (most recent call last):
File "tst.py", line 38, in <module>
pool()
File "tst.py", line 29, in pool
_list_files()
File "tst.py", line 25, in _list_files
for f in listdir("..."):
OSError: [Errno 2] No such file or directory: '...'
(It's not good form to post code that won't run, but it is a good idea to provide an MCVE.) So I fixed that:
index 39014ff..1ac9f4a 100644
--- a/tst.py
+++ b/tst.py
@@ -2,6 +2,8 @@ import multiprocessing as mp
from os import listdir
from os.path import join
+DIRPATH = 'inputs'
+
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
@@ -22,8 +24,8 @@ def worker(file):
dictionary[line].increment()
def _list_files():
- for f in listdir("..."):
- queue.put(join("...", f))
+ for f in listdir(DIRPATH):
+ queue.put(join(DIRPATH, f))
def pool():
_list_files()
and created an inputs/
directory with one sample input file:
$ ls inputs
one
$ cat inputs/one
1
one
unum
and now this example produces:
$ python tst.py
Traceback (most recent call last):
File "tst.py", line 40, in <module>
pool()
File "tst.py", line 34, in pool
for i in range(len(queue)):
TypeError: object of type 'AutoProxy[Queue]' has no len()
Now, I won't claim that this rewrite is good, but I went ahead and rewrote this into something that does work:
import multiprocessing as mp
from os import listdir
from os.path import join
DIRPATH = 'inputs'
class TestClass:
def __repr__(self):
return str(self._number)
def __init__(self):
self._number = 0
def increment(self):
self._number += 1
def worker(dictionary, queue):
while True:
path = queue.get()
if path is None:
return
f = open(path, 'r')
for line in f.readlines():
if line not in dictionary:
dictionary[line] = TestClass()
dictionary[line].increment()
def run_pool():
manager = mp.Manager()
queue = manager.Queue()
dictionary = manager.dict()
nworkers = mp.cpu_count()
pool = mp.Pool(nworkers)
for i in range(nworkers):
pool.apply_async(worker, args=(dictionary, queue))
for f in listdir(DIRPATH):
queue.put(join(DIRPATH, f))
for i in range(nworkers):
queue.put(None)
pool.close()
pool.join()
return dictionary
def main():
dictionary = run_pool()
print(dictionary)
if __name__ == '__main__':
main()
The main differences are:
I removed all the global variables. The manager instance, the managed queue, and the managed dictionary are all local to run_pool
.
I put the names of the files into the queue after creating nworker
workers. Each worker runs a loop, reading file names, until it reads a None
name, then returns its (None) result.
The main loop drops the file names into the queue, so that workers can pull file names out of the queue as they finish each previous file. To signal all nworkers
workers to exit, the main loop adds that many None
entries to the queue.
run_pool
returns the final (still managed) dictionary.
And of course I added a __repr__
to your TestClass
object so that we can see the counts. I also made sure that the code should work on Windows by moving the main
driver into a function, called only if __name__ == '__main__'
.
Upvotes: 2