Reputation: 346
Since I work with many .csv files and I have to often convert them to different formats, I really want to make an app for that in Python so I do not have to do it manually every time in notepad++.
The code below does work in single core mode. However, I would like to incorporate multiprocessing to process a few threads at a time.
My problem is that when I use multiprocessing, the code finishes without any error and nothing printed out.
Can you please help me troubleshoot it?
import pandas as pd
from multiprocessing import Process
import os
import time
start_time = time.time()
thread_count = 2
def timer():
print('Script ran for ' + time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)))
class Imported:
def __init__(self, filename, convert_type):
self.filename, self.convert_type = filename, convert_type
if self.convert_type == 'to_hana':
self.delim = ';'
self.decim = ','
if self.convert_type == 'from_hana':
self.delim = ','
self.decim = '.'
if self.convert_type not in ['to_hana' , 'from_hana']:
raise ValueError('Convert type is wrong!')
def convert(self):
if self.convert_type == 'to_hana':
self.file = pd.read_csv(self.filename, encoding = 'utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ','
self.decim = '.'
process_id = os.getpid()
print(f'Process ID: {process_id}')
self.file.to_csv(self.filename, index = None, header=True, decimal=self.decim, sep=self.delim)
print('test1')
timer()
if self.convert_type == 'from_hana':
self.file = pd.read_csv(self.filename, encoding = 'utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ';'
self.decim = ','
process_id = os.getpid()
print(f'Process ID: {process_id}')
self.file.to_csv(self.filename, index = None, header=True, decimal=self.decim, sep=self.delim)
print('test2')
timer()
def error_check(self):
if len(list(self.file.columns.values)[0]) > 20:
raise ValueError('Pravědpodobně poškozený soubor. Název prvního sloupce je příliš dlouhý.')
if __name__ == '__main__':
filenames = [['data.csv','from_hana'],['klasifikace.csv','to_hana'],['klasifikace_statistiky.csv','to_hana']]
processes = []
#for i in enumerate(filenames):
# Imported.convert(Imported(filenames[i[0]][0], filenames[i[0]][1]))
for i in enumerate(filenames):
process = Process(target=Imported.convert, args=(filenames[i[0]][0], filenames[i[0]][1]))
processes.append(process)
process.start()
for process in processes:
process.join()
print('DONE')
Upvotes: 1
Views: 216
Reputation: 781
import subprocess
from multiprocessing import Pool, cpu_count
import pandas as pd
def multi_processor(function_name, file_path, output_format):
file_list = []
#file_list = str(subprocess.check_output(f"find {file_path} -type f -iname \"*.csv*\" ", shell=True)).split('\\n')
#file_list = sorted(file_list)
# Test, put 6 strings in the list so your_function should run six times with 6 processors in parallel
file_list.append("file_path1")
file_list.append("file_path2")
file_list.append("file_path3")
file_list.append("file_path4")
file_list.append("file_path5")
file_list.append("file_path6")
# Use max number of system processors - 1
pool = Pool(processes=cpu_count()-1)
pool.daemon = True
results = {}
# for every file in the file list, start a new process
for each_file in file_list:
results[each_file] = pool.apply_async(your_function, args=(output_format, each_file))
# Wait for all processes to finish before proceeding
pool.close()
pool.join()
# Results and any errors are returned
return {your_function: result.get() for your_function, result in results.items()}
def your_function(output_format, file_name):
try:
df = pd.read_csv(file_name)
writer = pd.ExcelWriter(f"{file_name}{output_format}")
df.to_excel(writer)
writer.save()
return "Success!"
except Exception as e:
return str(e)
if __name__ == "__main__":
some_results = multi_processor("your_function", "some_path_to_csv_files", ".xlsx")
print(some_results)
Upvotes: 0
Reputation: 1759
You can solve it by creating object of your class and then starting a process by giving target as obj.convert
import pandas as pd
from multiprocessing import Process
import os
import time
start_time = time.time()
thread_count = 2
def timer():
print('Script ran for ' + time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time)))
class Imported:
def __init__(self, filename, convert_type):
self.filename, self.convert_type = filename, convert_type
if self.convert_type == 'to_hana':
self.delim = ';'
self.decim = ','
if self.convert_type == 'from_hana':
self.delim = ','
self.decim = '.'
if self.convert_type not in ['to_hana', 'from_hana']:
raise ValueError('Convert type is wrong!')
def convert(self):
if self.convert_type == 'to_hana':
self.file = pd.read_csv(self.filename, encoding='utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ','
self.decim = '.'
process_id = os.getpid()
print('Process ID:', process_id)
self.file.to_csv(self.filename, index=None, header=True, decimal=self.decim, sep=self.delim)
print('test1')
timer()
if self.convert_type == 'from_hana':
self.file = pd.read_csv(self.filename, encoding='utf-8', sep=self.delim, decimal=self.decim)
self.error_check()
self.delim = ';'
self.decim = ','
process_id = os.getpid()
print('Process ID', process_id)
self.file.to_csv(self.filename, index=None, header=True, decimal=self.decim, sep=self.delim)
print('test2')
timer()
def error_check(self):
if len(list(self.file.columns.values)[0]) > 20:
raise ValueError('Pravědpodobně poškozený soubor. Název prvního sloupce je příliš dlouhý.')
if __name__ == '__main__':
filenames = [['data.csv', 'from_hana'], ['klasifikace.csv', 'to_hana'], ['klasifikace_statistiky.csv', 'to_hana']]
processes = []
# for i in enumerate(filenames):
# Imported.convert(Imported(filenames[i[0]][0], filenames[i[0]][1]))
for i in enumerate(filenames):
obj = Imported(filenames[i[0]][0], filenames[i[0]][1])
process = Process(target=obj.convert)
processes.append(process)
process.start()
for process in processes:
process.join()
print('DONE')
Upvotes: 1