Reputation: 10213
I have code which convert one custom file format to other custom file format.
Used multiprocessing to divide work into parallel work. Some divide by process count logic to pass files list as a input argument.
code:
def CustomFun1(self, ):
start_time = time.time()
custom1_files = os.listdir("/Some/location")
self.total_custom1_count = custom1_files.__len__()
self.custom2_ctr = 0
self.updateReport(2, 1, self.total_custom_count, -1, "")
self.custom1_2_custom2_cmd = "command to convert custom1 to custom2"
process_count = 4
if process_count>1 and self.total_custom1_count>process_count:
import multiprocessing
mid = self.total_custom1_count/process_count
processes = []
start_index = 0
end_index = 0
for ii in range(1, process_count):
end_index += mid
pp = multiprocessing.Process(name='custom12custom2_%s_%s'%(ii, time.time()),\
target=self.createCustom1ToCustom2,\
args=(custom1_files[start_index:end_index], ))
pp.daemon = False
processes.append(pp)
start_index = end_index
pp = multiprocessing.Process(name='custom12custom2_%s_%s'%(ii, time.time()),\
target=self.createCustom1ToCustom2,\
args=(custom1_files[start_index:], ))
pp.daemon = False
processes.append(pp)
for pp in processes:
pp.start()
for pp in processes:
pp.join()
else:
self.createCustom1ToCustom2(custom1_files)
t2 = time.time() - start_time
self.updateReport(2, 2, self.total_custom1_count, self.custom2_ctr, t2)
def createCustom1ToCustom2(self, custom1_files):
""" Create Custom2 from the Custom1. """
try:
for cnt, custom1_file in enumerate(custom1_files, 1):
ret = os.system(self.custom1_2_custom2_cmd%(custom1_file, custom1_file.split('.')[0]))
self.custom2_ctr += 1
if self.custom2_ctr%5==0:
self.updateReport(2, 1, self.total_custom1_count, self.custom2_ctr, "")
except:
e = traceback.format_exc()
Following is function in which I write how many Custom1 type files are converted to Custom2 type file.
Report Variable:
self.report = [{"pn": "Extraction", "status": 0, "cnt": 0, "tt": 0},
{"pn": "Basic conversion Generation", "status": 0, "cnt": 0, "cur_i": 0, "tt": 0},
{"pn": "Cutom1 to custom2", "status": 0, "cnt": 0, "cur_i": 0, "tt": 0}
]
def updateReport(self, pos, status, cnt, cur_i, tt):
if not self.reportLoc:
return
try:
self.report[pos]["status"] = status
self.report[pos]["cnt"] = cnt
if tt:
self.report[pos]["tt"] = datetime.fromtimestamp(tt).strftime('%H:%M:%S')
self.report[pos]["cur_i"] = cur_i
with open(self.reportLoc, "w") as fp:
fp.write(simplejson.dumps(self.report))
except Exception, e:
e = traceback.format_exc()
Multiprocessing is working and also get expected output, This process takes 30 to 40 mints for 2000 files conversion and I need to show how many files are converted after 30 seconds or 1 mints. For-that I need the report file which written in the function updateReport
.
self.custom2_ctr
is not updated in code run by multiprocessing means value of self.custom2_ctr
is 0
at the end of function CustomFun1
.updateReport
, code shear common self.report
variable. How to set lock on function updateReport
means Only
one process can shear this function at one time?Let me know if you need more information.
Upvotes: 0
Views: 106
Reputation: 141
You could use lock to synchronise updateReport.
Check here: Synchronization between processes
Upvotes: 1