Reputation: 33
I have 30 csv files. Each file has 200,000 row and 10 columns.
I want to read these files and do some process. Below is the code without multi-thread:
import os
import time
csv_dir = './csv'
csv_save_dir = './save_csv'
csv_files = os.listdir(csv_dir)
if __name__ == '__main__':
if not os.path.exists(csv_save_dir):
os.makedirs(csv_save_dir)
start = time.perf_counter()
for csv_file in csv_files:
csv_file_path = os.path.join(csv_dir,csv_file)
with open(csv_file_path,'r') as f:
lines = f.readlines()
csv_file_save_path = os.path.join(csv_save_dir,'1_'+csv_file)
with open(csv_file_save_path,'w') as f:
f.writelines(lines[:20])
print(f'CSV File saved...')
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')
The elapsed time of the above code is about 7 seconds. This time, I modified the above code with multi-thread. The code is as follows:
import os
import time
import concurrent.futures
csv_dir = './csv'
csv_save_dir = './save_csv'
csv_files = os.listdir(csv_dir)
def read_and_write_csv(csv_file):
csv_file_path = os.path.join(csv_dir,csv_file)
with open(csv_file_path,'r') as f:
lines = f.readlines()
csv_file_save_path = os.path.join(csv_save_dir,'1_'+csv_file)
with open(csv_file_save_path,'w') as f:
f.writelines(lines[:20])
print(f'CSV File saved...')
if __name__ == '__main__':
if not os.path.exists(csv_save_dir):
os.makedirs(csv_save_dir)
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
executor.map(read_and_write_csv, [csv_file for csv_file in csv_files])
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')
Upvotes: 1
Views: 1665
Reputation: 408
I don't agree with the comments. Python will happily use multiple CPU cores if you have them, executing threads on separate cores.
What I think is the issue here is your test. If you added the "do some process" you mentioned to your thread workers, I think you may find the multi-thread version to be faster. Right now your test merely shows it takes about 7 seconds to read/write the CSV files which will be I/O locked and not take advantage of the CPUs.
If your "do some process" is non-trivial, I'd use multi-threading differently. Right now, you are having each thread do:
read csv file
process csv file
save csv file
This way, you are getting thread lock during the read and save steps, slowing things down.
For a non-trivial "process" step, I'd do this: (pseudo-code)
def process_csv(line):
<perform your processing on a single line>
<main>:
csv_file for csv_file in csv_files:
<read lines from csv>
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(process_csv, [line for line in lines])
<write lines out to csv>
Since you're locking on read/write anyway, here at least the work-per-line is being spread across cores. And you're not trying to read all CSV's into memory simultaneously. Pick max-workers value appropriate for the number of cores in your system.
If "do some process" is trivial, my suggestion is probably pointless.
Upvotes: 1