Vivek Sable
Vivek Sable

Reputation: 10213

Share Common variable in Multiprocessing

I have code which convert one custom file format to other custom file format.

Used multiprocessing to divide work into parallel work. Some divide by process count logic to pass files list as a input argument.

code:

def CustomFun1(self, ):
    start_time = time.time()
    custom1_files = os.listdir("/Some/location")
    self.total_custom1_count = custom1_files.__len__()
    self.custom2_ctr = 0
    self.updateReport(2, 1, self.total_custom_count, -1, "")
    self.custom1_2_custom2_cmd = "command to convert custom1 to custom2"

    process_count = 4
    if process_count>1 and self.total_custom1_count>process_count:
        import multiprocessing
        mid = self.total_custom1_count/process_count
        processes = []
        start_index = 0
        end_index = 0 
        for ii in range(1, process_count):
            end_index += mid
            pp = multiprocessing.Process(name='custom12custom2_%s_%s'%(ii, time.time()),\
                                       target=self.createCustom1ToCustom2,\
                                       args=(custom1_files[start_index:end_index], ))
            pp.daemon = False
            processes.append(pp)
            start_index = end_index

        pp = multiprocessing.Process(name='custom12custom2_%s_%s'%(ii, time.time()),\
                                     target=self.createCustom1ToCustom2,\
                                     args=(custom1_files[start_index:], ))
        pp.daemon = False
        processes.append(pp)
        for pp in processes:
            pp.start()
        for pp in processes:
            pp.join()
    else:
        self.createCustom1ToCustom2(custom1_files)
    t2 = time.time() - start_time
    self.updateReport(2, 2, self.total_custom1_count, self.custom2_ctr, t2)

def createCustom1ToCustom2(self, custom1_files):
    """ Create Custom2 from the Custom1. """
    try:
        for cnt, custom1_file in enumerate(custom1_files, 1):
            ret = os.system(self.custom1_2_custom2_cmd%(custom1_file, custom1_file.split('.')[0]))
            self.custom2_ctr += 1 
            if self.custom2_ctr%5==0:
                self.updateReport(2, 1, self.total_custom1_count, self.custom2_ctr, "")
    except:
        e = traceback.format_exc()

Following is function in which I write how many Custom1 type files are converted to Custom2 type file.

Report Variable:

self.report = [{"pn": "Extraction", "status": 0, "cnt": 0, "tt": 0},
               {"pn": "Basic conversion Generation", "status": 0, "cnt": 0, "cur_i": 0, "tt": 0},
               {"pn": "Cutom1 to custom2", "status": 0, "cnt": 0, "cur_i": 0, "tt": 0}
               ]
def updateReport(self, pos, status, cnt, cur_i, tt):
    if not self.reportLoc:
        return

    try:
        self.report[pos]["status"] = status
        self.report[pos]["cnt"] = cnt
        if tt:
            self.report[pos]["tt"] = datetime.fromtimestamp(tt).strftime('%H:%M:%S')

        self.report[pos]["cur_i"] = cur_i
        with open(self.reportLoc, "w") as fp:
            fp.write(simplejson.dumps(self.report))
    except Exception, e:
        e = traceback.format_exc()

Multiprocessing is working and also get expected output, This process takes 30 to 40 mints for 2000 files conversion and I need to show how many files are converted after 30 seconds or 1 mints. For-that I need the report file which written in the function updateReport.

  1. self.custom2_ctr is not updated in code run by multiprocessing means value of self.custom2_ctr is 0 at the end of function CustomFun1.
  2. In function updateReport, code shear common self.report variable. How to set lock on function updateReport means Only one process can shear this function at one time?

Let me know if you need more information.

Upvotes: 0

Views: 106

Answers (1)

Brainhash
Brainhash

Reputation: 141

You could use lock to synchronise updateReport.

Check here: Synchronization between processes

Upvotes: 1

Related Questions