aweeeezy
aweeeezy

Reputation: 876

Python multiprocessing Pool / Process has inconsistent results

My shared dictionary object has an inconsistent number of entries. It should have 500, but most tests end up with somewhere between 450 and 465. I've also tried using map and Process instead of apply_async.

map is slightly better because the shared dictionary has around 480 entries instead of around 450, but it's still inconsistent and not all 500 as expected.

I also tried using Process, but that results in the fewest number of entries in my shared dictionary -- around 420.

Here's the full code using apply_async:

import numpy as np
from PIL import Image
from os import listdir
from multiprocessing import Manager, Pool

def processImage(path, d):
  image = np.array(Image.open(source + "/" + path))

  # Copy lists from shared dictionary since updates don't work otherwise
  w = d["width"]
  h = d["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  d["width"] = w
  d["height"] = h

if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []

  for path in listdir(source):
    p.apply_async(processImage, (path, d))
  p.close()
  p.join()

Here's the full code using map:

def processImage(obj):
  image = np.array(Image.open(source + "/" + obj[1]))

  w = obj[0]["width"]
  h = obj[0]["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  obj[0]["width"] = w
  obj[0]["height"] = h

if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []

  p.map(processImage, zip(itertools.repeat(d), listdir(source)))

Here's the full code for using Process:

def processImage(path, d):
  image = np.array(Image.open(source + "/" + path))

  w = d["width"]
  h = d["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  d["width"] = w
  d["height"] = h

if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []

  jobs = []
  for img in listdir(source):
    p = Process(target=processImage, args=(img, d))
    p.start()
    jobs.append(p)

  for j in jobs:
    j.join()

Upvotes: 1

Views: 467

Answers (1)

Marat
Marat

Reputation: 15738

this is a classic example of a race condition. You need some kind of a synchronization primitive to update d.

Consider the following situation: there are two threads (well, subprocesses in your case) executing processImage. First obtains w and h, second obtains w and h. First appends something to both and puts it back to d. Second does something to its own w and h, which do not account for changes made by the first thread anymore, and puts it back to d. At this point, changes made by the first thread are lost.

To fix that, you need to guard parts of the code working with d:

from multiprocessing import Manager, Pool, Lock
...
lock = Lock()
...
def processImage(path, d):
    image = np.array(Image.open(source + "/" + path))

    lock.acquire()
    d["width"].append(image.shape[0])
    d["height"].append(image.shape[1])
    lock.release()

Upvotes: 3

Related Questions