snakes_on_a_keyboard
snakes_on_a_keyboard

Reputation: 884

Python Queue usage works in threading but (apparently) not in multiprocessing

many of the tutorials on multiprocessing use don't seem to completely address why the technique below works for threading but not multiprocessing.

Why doesn't this work for multiprocessing, and what is the implementation for what I am trying to do? Thank you!

Threading implementation (works fine, makes sense to me):

from threading import Thread
from Queue import Queue
from time import sleep    

"""threading functions"""
def producer_thread(n):
    for x in range(10):
        thread_q.put(n)

def consumer_thread():
    while True:
        item = thread_q.get()
        print item

if __name__ == '__main__':
    thread_q = Queue()

    """works fine"""
    p_thread = Thread(target=producer_thread, args=(10,))
    c_thread = Thread(target=consumer_thread)
    c_thread.daemon=True
    p_thread.start(); c_thread.start()
    p_thread.join()
    """prevents c_thread daemon process from cancelling prematurely"""
    sleep(.001)

Output:

10
10
10
10
10
10
10
10
10
10

Multiprocessing implementation (seems to be identical to threading but doesn't work at all):

from multiprocessing import Process, freeze_support
from Queue import Queue

"""multiprocessing functions"""
def producer_process(n):
    for x in range(10):
        process_q.put(n)

def consumer_process():
    while True:
        item = process_q.get()
        print item
#            
if __name__ == '__main__':
    freeze_support()
    process_q = Queue()        
    """computer explodes"""
    p_process = Process(target=producer_process, args=(10,))
    c_process = Process(target=consumer_process)
    c_process.daemon=True
    p_process.start(); c_process.start()
    p_process.join()

Output:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Process Process-33:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
    item = q.get()
NameError: global name 'q' is not defined
Process Process-32:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
    q.put(n)
NameError: global name 'q' is not defined
Process Process-34:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
    q.put(n)
NameError: global name 'q' is not defined
Process Process-35:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
    item = q.get()
NameError: global name 'q' is not defined
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'consumer'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'producer'

Upvotes: 1

Views: 1103

Answers (3)

Juan Fco. Roco
Juan Fco. Roco

Reputation: 1638

import Queue is for multithreads apps: https://docs.python.org/2/library/queue.html not for multi processes apps.

from multiprocessing import Queue is for multiprocesses apps: https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

According to the documentation multiprocessing.Queue "is a near clone of Queue.Queue"

Besides multiprocessing.Queue there is the JoinableQueue that has task_done() and join() methods in case you need it.

In your example I don't think you need JoinableQueue. Did you try this:

from multiprocessing import (Process, Queue, freeze_support)

def producer(q, n):
    for x in range(n):
        q.put(x)
    q.put("end")


def consumer(q):
    while True:
        item = q.get()
        if item == "end":
            break
        print item

if __name__ == '__main__':
    freeze_support()
    q = Queue()
    c = Process(target=consumer, args=(q,))
    c.start()
    p = Process(target=producer, args=(q, 10))
    p.start()
    c.join()

Tested in Linux and Windows.

Upvotes: 2

snakes_on_a_keyboard
snakes_on_a_keyboard

Reputation: 884

Okay, (sorry(?)) to answer my own question, I found a working implementation of what I was trying to do. There seems to be quite a bit of nuance going on.

First of all, multiprocessing requires a JoinableQueue rather than a standard Queue.

Secondly, since the multiprocessing functions are modifying a queue in place, the queue needs to be passed as an argument to the function -- maybe this should have been obvious, but I obviously overlooked it.

Thirdly, and perhaps most importantly, the threads don't print to the stdout of the interpreter -- they print to windows stdout, so you MUST run it from the command line if you want to see the output.

"""multiprocessing functions"""
def producer_process(n, q):
    for x in range(10):
        q.put(n)

def consumer_process(q):
    while True:
        item = q.get()
        print item
        q.task_done()

if __name__ == '__main__':
    from multiprocessing import Process, freeze_support, JoinableQueue

    freeze_support()
    process_q = JoinableQueue()        
    '''launch consumer process'''
    c_process = Process(target=consumer_process, args=(process_q,))
    c_process.daemon = True
    c_process.start()

    '''launch producer process'''
    p_process = Process(target=producer_process, args=(10, process_q))
    p_process.start()
    p_process.join()

    process_q.join()
    print "Done"

Upvotes: 0

Meng Wang
Meng Wang

Reputation: 277

When I run the Thread version, I got:

File "test.py", line 18, in <module> 
  p_thread = Thread(target=producer, args=(10,)) 
    NameError: name 'producer' is not defined 

Also, I think one error in multiprocessing version

NameError: global name 'q' is not defined

should be some typo. It seems that nothing named "q" is defined.

EDIT: Now I run thread version, and find less then ten "10" are printed: typically there are there or four - and it changes randomly in different run. I'm using python 2.7.5 . Can you check this issue ?

EDIT I run the mp version, there is no output or error message, and the program terminated quickly. I believe there are some issue with the logic - and it cannot be ignored. I think fixing the thread version first may be of great help for you.

Upvotes: 0

Related Questions