Reputation: 876
My shared dictionary object has an inconsistent number of entries. It should have 500, but most tests end up with somewhere between 450 and 465. I've also tried using map
and Process
instead of apply_async
.
map
is slightly better because the shared dictionary has around 480 entries instead of around 450, but it's still inconsistent and not all 500 as expected.
I also tried using Process, but that results in the fewest number of entries in my shared dictionary -- around 420.
Here's the full code using apply_async
:
import numpy as np
from PIL import Image
from os import listdir
from multiprocessing import Manager, Pool
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
# Copy lists from shared dictionary since updates don't work otherwise
w = d["width"]
h = d["height"]
w.append(image.shape[0])
h.append(image.shape[1])
d["width"] = w
d["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
for path in listdir(source):
p.apply_async(processImage, (path, d))
p.close()
p.join()
Here's the full code using map
:
def processImage(obj):
image = np.array(Image.open(source + "/" + obj[1]))
w = obj[0]["width"]
h = obj[0]["height"]
w.append(image.shape[0])
h.append(image.shape[1])
obj[0]["width"] = w
obj[0]["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
p.map(processImage, zip(itertools.repeat(d), listdir(source)))
Here's the full code for using Process
:
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
w = d["width"]
h = d["height"]
w.append(image.shape[0])
h.append(image.shape[1])
d["width"] = w
d["height"] = h
if __name__ == "__main__":
source = "./sample/images"
p = Pool()
m = Manager()
d = m.dict()
d["width"], d["height"] = [], []
jobs = []
for img in listdir(source):
p = Process(target=processImage, args=(img, d))
p.start()
jobs.append(p)
for j in jobs:
j.join()
Upvotes: 1
Views: 467
Reputation: 15738
this is a classic example of a race condition. You need some kind of a synchronization primitive to update d
.
Consider the following situation: there are two threads (well, subprocesses in your case) executing processImage
. First obtains w
and h
, second obtains w
and h
. First appends something to both and puts it back to d
. Second does something to its own w
and h
, which do not account for changes made by the first thread anymore, and puts it back to d
. At this point, changes made by the first thread are lost.
To fix that, you need to guard parts of the code working with d
:
from multiprocessing import Manager, Pool, Lock
...
lock = Lock()
...
def processImage(path, d):
image = np.array(Image.open(source + "/" + path))
lock.acquire()
d["width"].append(image.shape[0])
d["height"].append(image.shape[1])
lock.release()
Upvotes: 3