Reputation: 1021
It's a producer and worker workflow with multiprocessing and gevent. I want to share some data with Queue of multiprocessing between Process. And at the same time, gevent producer and worker get data and put task to the Queue.
task1_producer will generate some data and put them into q1 task1_worker comsumes the data from task q1 and put generated data into q2 and q3.
Then the task2 does.
But question here is that, data has been inserted into q3 and q4, but nothing happened with task2. If add some logs in task2, you will find that, q3 is empty. Why would this happened? What's the best method to share data between process?
from multiprocessing import Value, Process, Queue
#from gevent.queue import Queue
from gevent import monkey, spawn, joinall
monkey.patch_all() # Magic!
import requests
import json
import time
import logging
from logging.config import fileConfig
def configure():
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s - %(module)s - line %(lineno)d - process-id %(process)d - (%(threadName)-5s)- %(levelname)s - %(message)s")
# fileConfig(log_file_path)
return logging
logger = configure().getLogger(__name__)
def task2(q2, q3):
crawl = task2_class(q2, q3)
crawl.run()
class task2_class:
def __init__(self, q2, q3):
self.q2 = q2
self.q3 = q3
def task2_producer(self):
while not self.q2.empty():
logger.debug("comment_weibo_id_queue not empty")
task_q2 = self.q2.get()
logger.debug("task_q2 is {}".format(task_q2))
self.q4.put(task_q2)
def worker(self):
while not self.q3.empty():
logger.debug("q3 not empty")
data_q3 = self.q3.get()
print(data_q3)
def run(self):
spawn(self.task2_producer).join()
joinall([spawn(self.worker) for _ in range(40)])
def task1(user_id, q1, q2, q3):
task = task1_class(user_id, q1, q2, q3)
task.run()
class task1_class:
def __init__(self, user_id, q1, q2, q3):
self.user_id = user_id
self.q1 = q1
self.q2 = q2
self.q3 = q3
logger.debug(self.user_id)
def task1_producer(self):
for data in range(20):
self.q1.put(data)
logger.debug(
"{} has been put into q1".format(data))
def task1_worker(self):
while not self.q1.empty():
data = self.q1.get()
logger.debug("task1_worker data is {}".format(data))
self.q2.put(data)
logger.debug(
"{} has been inserted to q2".format(data))
self.q3.put(data)
logger.debug(
"{} has been inserted to q3".format(data))
def run(self):
spawn(self.task1_producer).join()
joinall([spawn(self.task1_worker) for _ in range(40)])
if __name__ == "__main__":
q1 = Queue()
q2 = Queue()
q3 = Queue()
p2 = Process(target=task1, args=(
"user_id", q1, q2, q3,))
p3 = Process(target=task2, args=(
q2, q3))
p2.start()
p3.start()
p2.join()
p3.join()
some logs
017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-12)- DEBUG - 10 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-13)- DEBUG - 11 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-14)- DEBUG - 12 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-15)- DEBUG - 13 has been inserted to q3
2017-05-17 13:46:40,222 - demo - line 78 - process-id 13269 - (DummyThread-16)- DEBUG - 14 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-17)- DEBUG - 15 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-18)- DEBUG - 16 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-19)- DEBUG - 17 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-20)- DEBUG - 18 has been inserted to q3
2017-05-17 13:46:40,223 - demo - line 78 - process-id 13269 - (DummyThread-21)- DEBUG - 19 has been inserted to q3
[Finished in 0.4s]
Upvotes: 2
Views: 1038
Reputation: 586
gevent's patch_all
is incompatible with multiprocessing.Queue
. Specifically, patch_all
calls patch_thread
by default, and patch_thread is documented to have issues with multiprocessing.Queue.
If you want to use multiprocessing.Queue
, you can pass thread=False
as an argument to patch_all
, or just use the specific patch functions that you need, e.g., patch_socket()
. (This assumes that you don't need monkey-patched threads, of course, which your example doesn't use.)
Alternatively, you could consider an external queue like Redis, or directly passing data across (unix, probably) sockets (which is what multiprocessing.Queue
does under the covers). Admittedly, both are more complex.
Upvotes: 2