schmat_90
schmat_90

Reputation: 612

How to put() and get() from a multiprocessing.Queue() at the same time?

I'm working on a python 2.7 program that performs these actions in parallel using multiprocessing:

I am new to multiprocessing and I'm not extremely expert with python in general. Therefore, I read a lot of already asked questions and tutorials: I feel close to the point but I am now probably missing something that I can't really spot.

The code is structured like this:

from itertools import izip
from multiprocessing import Queue, Process, Lock

nthreads = int(mp.cpu_count())
outq = Queue(nthreads)
l = Lock()

def func(record_1, record_2):
     result = # do stuff
     outq.put(result)

OUT = open("outputfile.txt", "w")
IN1 = open("infile_1.txt", "r")
IN2 = open("infile_2.txt", "r")

processes = []
for record_1, record_2 in izip(IN1, IN2):
     proc = Process(target=func, args=(record_1, record_2))
     processes.append(proc)
     proc.start()

for proc in processes:
     proc.join()

while (not outq.empty()):
     l.acquire()
     item = outq.get()
     OUT.write(item)
     l.release()

OUT.close()
IN1.close()
IN2.close()

To my understanding (so far) of multiprocessing as package, what I'm doing is:

Now, my problem is that when I run this script it immediately becomes a zombie process. I know that the function works because without the multiprocessing implementation I had the results I wanted.

I'd like to read from the two files and write to output at the same time, to avoid generating a huge list from my input files and then reading it (input files are huge). Do you see anything gross, completely wrong or improvable?

Upvotes: 0

Views: 2711

Answers (1)

CasualDemon
CasualDemon

Reputation: 6160

The biggest issue I see is that you should pass the queue object through the process instead of trying to use it as a global in your function.

def func(record_1, record_2, queue):
     result = # do stuff
     queue.put(result)

for record_1, record_2 in izip(IN1, IN2):
     proc = Process(target=func, args=(record_1, record_2, outq))

Also, as currently written, you would still be pulling all that information into memory (aka the queue) and waiting for the read to finish before writing to the output file. You need to move the p.join loop until after reading through the queue, and instead of putting all the information in the queue at the end of the func it should be filling the queue with chucks in a loop over time, or else it's the same as just reading it all into memory.

You also don't need a lock unless you are using it in the worker function func, and if you do, you will again want to pass it through.

If you want to not to read / store a lot in memory, I would write out the same time I am iterating through the input files. Here is a basic example of combining each line of the files together.

with open("infile_1.txt") as infile1, open("infile_2.txt") as infile2, open("out", "w") as outfile:
    for line1, line2 in zip(infile1, infile2):
        outfile.write(line1 + line2)

I don't want to write to much about all of these, just trying to give you ideas. Let me know if you want more detail about something. Hope it helps!

Upvotes: 1

Related Questions