Reputation: 275
So I am just trying to multiprocess and read each line in a text doc. There are 660918 lines, all of which I know to be the same length. Although, with the following code, the length of the lines seem to change, and I cannot figure out why.
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self,in_q):
multiprocessing.Process.__init__(self)
self.in_q = in_q
def run(self):
while True:
try:
in_q.get()
temp_line = short_file.readline()
temp_line = temp_line.strip().split()
print len(temp_line)
self.in_q.task_done()
except:
break
if __name__ == "__main__":
num_proc = 10
lines = 100000 #660918 is how many lines there actually are
in_q = multiprocessing.JoinableQueue()
File = 'HGDP_FinalReport_Forward.txt'
short_file = open(File)
for i in range(lines):
in_q.put(i)
for i in range(num_proc):
worker = Worker(in_q)
worker.start()
in_q.join()
Upvotes: 2
Views: 190
Reputation: 275
So, I changed it to use Pool, and it seems to work. Is the following better?
import multiprocessing as mp
File = 'HGDP_FinalReport_Forward.txt'
#short_file = open(File)
test = []
def pro(temp_line):
temp_line = temp_line.strip().split()
return len(temp_line)
if __name__ == "__main__":
with open("HGDP_FinalReport_Forward.txt") as lines:
pool = mp.Pool(processes = 10)
t = pool.map(pro,lines.readlines())
print t
Upvotes: 1
Reputation: 366073
You're opening a file in the main process, then reading from that file in the child processes. You can't do that.
Deep under the covers, the file object is effectively a raw file handle and a memory buffer. Each process shares the file handle, but each one has its own memory buffer.
Let's say all of the lines are 50 bytes each, and the memory buffer is 4096 bytes.
Process 1 calls readline, which reads bytes 0-4095 from the file into its buffer, then scans that buffer for a newline, which is 50 bytes in, and it returns the first 50 bytes. So far, so good.
Process 2 calls readline, which reads bytes 4096-8191 from the file into its buffer, then scans that buffer for a newline. The first one is at 4100, which is 5 bytes in, so it returns the first 5 bytes.
And so on.
You could theoretically get around this by doing unbuffered I/O, but really, why? Why not just read the lines in your main process? Besides avoiding this problem, that would also probably improve parallelism—the I/O is inherently sequential, so all of those processes will spend most of their time blocked on I/O, which means they're not doing you any good.
As a side note, near the top of the loop in run, you're doing in_q.get() instead of self.in_q.get(). (That happens to work because in_q is a global variable that never goes away and self.in_q is just a copy of it, but you don't want to rely on that.)
Upvotes: 7