Locane
Locane

Reputation: 3134

multiprocessing.apply_async does not actually start when passed a queue.Queue object

I am really frustrated. Why doesn't Python's multiprocessing.apply_async() actually START the process when a queue object is passed as an argument or a part of an argument?

This code works as expected:

#! /usr/bin/env python3

import multiprocessing
import queue
import time

def worker(var):
    while True:
        print("Worker {}".format(var))
        time.sleep(2)

pool = multiprocessing.Pool(20)
m = multiprocessing.Manager()
q = queue.Queue()

for i in range(20):
    pool.apply_async(worker, (i,))

print("kicked off workers")
pool.close()
pool.join()

But just by passing queue q, nothing happens when you run it now:

#! /usr/bin/env python3

import multiprocessing
import queue
import time

def worker(var,q):
    while True:
        print("Worker {}".format(var))
        time.sleep(2)

pool = multiprocessing.Pool(20)
m = multiprocessing.Manager()
q = queue.Queue()

for i in range(20):
    pool.apply_async(worker, (i,q))

print("kicked off workers")
pool.close()
pool.join()

Again; super frustrating. What the hell is going on? What am I doing wrong?

Upvotes: 2

Views: 405

Answers (1)

martineau
martineau

Reputation: 123443

When you want to share a Queue between processes, you have to create a proxy for one with multiprocessing.managers.SyncManager.Queue.

import multiprocessing
import time


def worker(var, q):
    while True:
        print("Worker {}".format(var))
        time.sleep(2)


if __name__ == '__main__':  # Be sure to include this.

    pool = multiprocessing.Pool(20)
    mgr = multiprocessing.Manager()
    q = mgr.Queue()  # Create a shared queue.Queue object.

    for i in range(20):
        pool.apply_async(worker, (i,q))

    print("kicked off workers")
    pool.close()

    print('joining pool')
    pool.join()

    print('done')

Upvotes: 1

Related Questions