keddad
keddad

Reputation: 1796

Why my multiprocess queue might be "losing" items?

I've got some code where I want to share objects between processes using queues. I've got a parent:

processing_manager = mp.Manager()
to_cacher = processing_manager.Queue()
fetchers = get_fetchers()
fetcher_process = mp.Process(target=fetch_news, args=(to_cacher, fetchers))
fetcher_process.start()
while 1:
    print(to_cacher.get())

And a child:

def fetch_news(pass_to: Queue, fetchers: List[Fetcher]):
    def put_news_to_query(pass_to: Queue, fetchers: List[Fetcher]):
        for fet in fetchers:
            for news in fet.latest_news():
                print(news)
                pass_to.put(news)
        print("----------------")

    put_news_to_query(pass_to, fetchers)

I'm expecting to see N objects printed in put_news_to_query, then a line, and then the same objects printed in while loop in a parent. Problem is, objects appear to miss: if I get, say, 8 objects printed in put_news_to_query I get only 2-3 objects printed in while loop. What am I doing wrong here?

Upvotes: 0

Views: 427

Answers (2)

keddad
keddad

Reputation: 1796

So, apparently, it was something related to in which order put and get statements were executed. Basically, some of the objects from parent's print were printed before the line. If you struggle with something like this, I'd recommend adding something to distinguish prints, like this:

print(f"Worker: {news}")
print(f"Main: {to_cacher.get()}")

Upvotes: 0

tdelaney
tdelaney

Reputation: 77357

This is not the answer, unless the answer is that the code is already working. I've just modified the code to make it a running example of the same technique. The data gets from child to parent without data loss.

import multiprocessing as mp
import time
import random

def worker(pass_to):
    for i in range(10):
        time.sleep(random.randint(0,10)/1000)
        print('child', i)
        pass_to.put(i)
    print("---------------------")
    pass_to.put(None)

def main():
    manager = mp.Manager()
    to_cacher = manager.Queue()
    fetcher = mp.Process(target=worker, args=(to_cacher,))
    fetcher.start()
    while 1:
        msg = to_cacher.get()
        if msg is None:
            break
        print(msg)

if __name__ == "__main__":
    main()

Upvotes: 1

Related Questions