FlyingZebra1
FlyingZebra1

Reputation: 1346

Joining output of multiprocessing workers - python2

I have [abstracted] multiprocessing script below, where I'm trying to:

1) split workload between two processes (add 1 to each variable in a list + append new variable to a new list)

2) Join the output of both processes into a new global list for future processing.

Any idea on how I can go about taking output of both processes, and joining that output together into a global list? What I want to end up with, after execution is:

new_id_list = [2, 4, 6, 8, 10, 3, 5, 7, 9, 11] #new_id_list from worker1 + new_id_list from worker2

#python2
from multiprocessing import Process, Queue
from time import sleep

#records to process
id_list = [1,2,3,4,5,6,7,8,9,10]

#new output id list
new_id_list = []

queue = Queue()

def mp_worker(queue):

    while queue.qsize() >0 :
        record = queue.get()
        new_id_list.append(record+1)
        sleep(.1)
    print(new_id_list)
    ###how would I go about passing this new_id_list as the global variable
    print("worker closed")

def mp_handler():

    # Spawn two processes, assigning the method to be executed 
    # and the input arguments (the queue)
    processes = [Process(target=mp_worker, args=(queue,)) for _ in range(2)]

    for process in processes:
        process.start()
        print('Process started')

    for process in processes:
        process.join()



if __name__ == '__main__':

    for id in id_list:
        queue.put(id)
    mp_handler()

Upvotes: 0

Views: 162

Answers (2)

FlyingZebra1
FlyingZebra1

Reputation: 1346

Found an article here regarding the solution.

Working code below. Basically:

1) we use multiprocessing.Manager()

2) generate a list using Manager

3) Pass the list to each worker, then have each worker append the output back to the list.

from multiprocessing import Process, Queue
from time import sleep
import multiprocessing

#records to process
id_list = [1,2,3,4,5,6,7,8,9,10]

#new output id list
new_id_list = []

queue = Queue()

def mp_worker(d,queue):

    while queue.qsize() >0 :
        record = queue.get()
        new_id_list.append(record+1)
        d.append(record+1)
        sleep(.1)
    print(new_id_list)  
    print("worker closed")

def mp_handler():

    # Spawn two processes, assigning the method to be executed 
    # and the input arguments (the queue)
    processes = [Process(target=mp_worker, args=(d,queue,)) for _ in range(2)]

    for process in processes:
        process.start()
        print('Process started')

    for process in processes:
        process.join()



if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.list()
    for id in id_list:
        queue.put(id)
    mp_handler()
    print(d)  #

Upvotes: 1

andreihondrari
andreihondrari

Reputation: 5833

I assume the issue you encountered was the inability of both processes to share new_id_list.

What you need to do is to create another Queue which will represent the result queue and pass it to both processes. Append to the queue as needed inside the processes and at the end of both processes execution (after the process.join()) you just extract everything from the queue in a list.

Upvotes: 1

Related Questions