Neel
Neel

Reputation: 37

Queue not getting cleared while using Multiprocessing in Python

I am having 1 queue which is accessed by 2 multiprocessing functions. Both these processes and consuming the same item in the queue and then clearing it. I want each one to take one unique value only. What am I doing wrong?

import time
import queue
import multiprocessing
import threading

q = queue.Queue(maxsize=0)
run_1 = 1
run_2 = 1

def multi_one():
    while run_1 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

def multi_two():
    while run_2 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

p1 = multiprocessing.Process(target=multi_one)
p2 = multiprocessing.Process(target=multi_two)

for item in range(10):
    q.put(item)

p1.start()
p2.start()

Output I am getting is:

0
0
1
1
2
2
...

Output I am looking for is:

0
1
2
3
4
5
6
7
8
9

Upvotes: 1

Views: 866

Answers (2)

alex_noname
alex_noname

Reputation: 32123

Your code contains several errors, describing which I will quote from the documentation:

  • You should protect the “entry point” of the program by using if __name__ == '__main__'

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

  • You should pass the queue object as an argument to constructor.

On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

  • You should use multiprocessing.Queue or multiprocessing.JoinableQueue if you want use JoinableQueue.task_done() Because queue.Queue is used only in a multi-threading context when both the producer and the consumer are in the same process.

Considering the above notes, your code can be modified in this way (although it is still far from ideal):

import time
import multiprocessing

import threading

q = multiprocessing.JoinableQueue(maxsize=0)
run_1 = 1
run_2 = 1

def multi_one(q):
    while run_1 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

def multi_two(q):
    while run_2 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

if __name__ == "__main__":
    p1 = multiprocessing.Process(target=multi_one, args=(q, ))
    p2 = multiprocessing.Process(target=multi_two, args=(q, ))

    for item in range(10):
        q.put(item)

    p1.start()
    p2.start()

Output:

0
1
2
3
...

Upvotes: 2

Andrej Kesely
Andrej Kesely

Reputation: 195438

You're using wrong type of Queue, try to change it to multiprocessing.JoinableQueue:

import time
import multiprocessing

q = multiprocessing.JoinableQueue(maxsize=0)
run_1 = 1
run_2 = 1

def multi_one(q):
    while run_1 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

def multi_two(q):
    while run_2 == 1:
        item = q.get()
        q.task_done()
        time.sleep(2)
        print(item)

for item in range(10):
    q.put(item)

p1 = multiprocessing.Process(target=multi_one, args=(q, ))
p2 = multiprocessing.Process(target=multi_two, args=(q, ))

p1.start()
p2.start()

Prints:

0
1
2
3
4
5
6
7
8
9

Upvotes: 1

Related Questions