william007
william007

Reputation: 18525

Why do I get different results when using multiprocessing with an instance method vs with a function?

For the following code, which passes an instance method to the Pool, the list is empty at the end of the script:

import time

from multiprocessing import Pool

class Hello:
    def __init__(self):
        self.result_list=[]

    def f(self,x,y):
        time.sleep(2)
        return x*y


    def log_result(self,result):
        # This is called whenever foo_pool(i) returns a result.
        # result_list is modified only by the main process, not the pool workers.
        print result
        self.result_list.append(result)

if __name__ == '__main__':
    pool = Pool()              # start 4 worker processes
    h=Hello()
    for i in range(10):
        pool.apply_async(h.f, args = (i,i, ), callback = h.log_result)
    pool.close()
    pool.join()
    print(h.result_list)

With this code, the list is populated as expected.

import multiprocessing as mp
import time

def foo_pool(x):
    time.sleep(2)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()

What's different about the two? Why doesn't it work with the instance method?

Upvotes: 4

Views: 554

Answers (1)

dano
dano

Reputation: 94871

If you actually try to fetch the result of one of you apply_async calls, you'll see that they're all failing with this error:

cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

This is because in Python 2.x, instance methods aren't picklable by default, so trying to pass the instance method h.f to the worker process fails. This is actually fixed in Python 3, but you can backport the behavior to Python 2 quite easily, using the copy_reg module:

import time

from multiprocessing import Pool
import copy_reg
import types

def _reduce_method(m):
    if m.__self__ is None:
        return getattr, (m.__class__, m.__func__.__name__)
    else:
        return getattr, (m.__self__, m.__func__.__name__)

copy_reg.pickle(types.MethodType, _reduce_method)

class Hello:
    def __init__(self):
        self.result_list=[]

    def f(self,x,y):
        time.sleep(2)
        return x*y


    def log_result(self, result):
        print(result)
        self.result_list.append(result)

if __name__ == '__main__':
    pool = Pool()
    h = Hello()
    for i in range(10):
        pool.apply_async(h.f, args = (i,i), callback=h.log_result)
    pool.close()
    pool.join()
    print(h.result_list)

Output:

0
4
49
25
1
16
36
9
64
81
[0, 4, 49, 25, 1, 16, 36, 9, 64, 81]

Upvotes: 3

Related Questions