Reputation: 298492
I've never worked with multiprocessing
before so bear with me if I'm asking a basic question.
This answer provided a very nice processing class that I adapted to my needs and it works very well. I'm trying to implement a basic progress bar which I'm testing using print
statements, but it is not working at all (no output whatsoever).
My current code is this:
class ParsingMaster(object):
def __init__(self, parser, input_file, output_file):
self.parser = parser
self.num_processes = cpu_count()
self.input_file = input_file
self.output_file = output_file
self.input_queue = Queue()
self.output_queue = Queue()
self.input_size = 0
self.input_process = Process(target=self.parse_input)
self.output_process = Process(target=self.write_output)
self.processes = [Process(target=self.process_row) for row in range(self.num_processes)]
self.input_process.start()
self.output_process.start()
for process in self.processes:
process.start()
self.input_process.join()
for process in self.processes:
process.join()
self.output_process.join()
def parse_input(self):
for index, row in enumerate(self.input_file):
self.input_queue.put([index, row])
self.input_size = self.input_queue.qsize()
for i in range(self.num_processes):
self.input_queue.put('STOP')
def process_row(self):
for index, row in iter(self.input_queue.get, 'STOP'):
self.output_queue.put([index, row[0], self.parser.parse(row[1])])
self.output_queue.put('STOP')
def write_output(self):
current = 0
buffer = {}
for works in range(self.num_processes):
for index, id, row in iter(self.output_queue.get, 'STOP'):
if index != current:
buffer[index] = [id] + row
else:
self.output_file.writerow([id] + row)
current += 1
while current in buffer:
self.output_file.writerow(buffer[current])
del buffer[current]
current += 1
if self.input_size:
print float(current * 100) / float(self.input_size)
After some testing, I've found some strange things:
self.input_size
is updated properly in parse_input()
.parse_input()
ends while write_output()
is still running.write_output()
always reports that self.input_size = 0
.Can anyone tell me where I'm going wrong here? Any help is helpful, so thank you in advance.
Upvotes: 0
Views: 1265
Reputation: 34290
self.input_size
is a process-local variable, each process will have its own copy. According to the multiprocessing documentation, you need to wrap your data into containers like Value
and Array
to make it shared.
Upvotes: 2