Blender
Blender

Reputation: 298492

Multiprocessing with progress

I've never worked with multiprocessing before so bear with me if I'm asking a basic question.

This answer provided a very nice processing class that I adapted to my needs and it works very well. I'm trying to implement a basic progress bar which I'm testing using print statements, but it is not working at all (no output whatsoever).

My current code is this:

class ParsingMaster(object):
  def __init__(self, parser, input_file, output_file):
    self.parser = parser

    self.num_processes = cpu_count()
    self.input_file = input_file
    self.output_file = output_file

    self.input_queue = Queue()
    self.output_queue = Queue()

    self.input_size = 0

    self.input_process = Process(target=self.parse_input)
    self.output_process = Process(target=self.write_output)
    self.processes = [Process(target=self.process_row) for row in range(self.num_processes)]

    self.input_process.start()
    self.output_process.start()

    for process in self.processes:
      process.start()

    self.input_process.join()

    for process in self.processes:
      process.join()

    self.output_process.join()

  def parse_input(self):
    for index, row in enumerate(self.input_file):
      self.input_queue.put([index, row])
      self.input_size = self.input_queue.qsize()

    for i in range(self.num_processes):
      self.input_queue.put('STOP')

  def process_row(self):
    for index, row in iter(self.input_queue.get, 'STOP'):
      self.output_queue.put([index, row[0], self.parser.parse(row[1])])

    self.output_queue.put('STOP')

  def write_output(self):
    current = 0
    buffer = {}

    for works in range(self.num_processes):
      for index, id, row in iter(self.output_queue.get, 'STOP'):
        if index != current:
          buffer[index] = [id] + row
        else:
          self.output_file.writerow([id] + row)
          current += 1

          while current in buffer:
            self.output_file.writerow(buffer[current])
            del buffer[current]
            current += 1

            if self.input_size:
              print float(current * 100) / float(self.input_size)

After some testing, I've found some strange things:

Can anyone tell me where I'm going wrong here? Any help is helpful, so thank you in advance.

Upvotes: 0

Views: 1265

Answers (1)

bereal
bereal

Reputation: 34290

self.input_size is a process-local variable, each process will have its own copy. According to the multiprocessing documentation, you need to wrap your data into containers like Value and Array to make it shared.

Upvotes: 2

Related Questions