Relax ZeroC
Relax ZeroC

Reputation: 683

python - multiprocessing with queue

Here is my code below , I put string in queue , and hope dowork2 to do something work , and return char in shared_queue

but I always get nothing at while not shared_queue.empty()

please give me some point , thanks.

import time
import multiprocessing as mp


class Test(mp.Process):

    def __init__(self, **kwargs):
        mp.Process.__init__(self)
        self.daemon = False
        print('dosomething')

    def run(self):
        manager = mp.Manager()
        queue = manager.Queue()
        shared_queue = manager.Queue()
        # shared_list = manager.list()
        pool = mp.Pool()
        results = []
        results.append(pool.apply_async(self.dowork2,(queue,shared_queue)))
        while True:
            time.sleep(0.2)
            t =time.time()
            queue.put('abc')
            queue.put('def')
            l = ''
            while not shared_queue.empty():
                l = l + shared_queue.get()
            print(l)
            print( '%.4f' %(time.time()-t))

        pool.close()
        pool.join()

    def dowork2(queue,shared_queue):
        while True:
            path = queue.get()
            shared_queue.put(path[-1:])

if __name__ == '__main__':
    t = Test()
    t.start()
    # t.join()
    # t.run()

Upvotes: 0

Views: 68

Answers (1)

Hannu
Hannu

Reputation: 12205

I managed to get it work by moving your dowork2 outside the class. If you declare dowork2 as a function before Test class and call it as

results.append(pool.apply_async(dowork2, (queue, shared_queue)))

it works as expected. I am not 100% sure but it probably goes wrong because your Test class is already subclassing Process. Now when your pool creates a subprocess and initialises the same class in the subprocess, something gets overridden somewhere.

Overall I wonder if Pool is really what you want to use here. Your worker seems to be in an infinite loop indicating you do not expect a return value from the worker, only the result in the return queue. If this is the case, you can remove Pool.

I also managed to get it work keeping your worker function within the class when I scrapped the Pool and replaced with another subprocess:

foo = mp.Process(group=None, target=self.dowork2, args=(queue, shared_queue))
foo.start()
# results.append(pool.apply_async(Test.dowork2, (queue, shared_queue)))
while True:
    ....

(you need to add self to your worker, though, or declare it as a static method:)

def dowork2(self, queue, shared_queue):

Upvotes: 1

Related Questions