Reputation: 3208
Suppose I have a class, And want to read few files from the disk in parallel, and parameterize class parameters. What is the most correct way to do it (and how)?
I thought about threading since it's only I/O actions.
Example of non-parallel implementation (1-Threading):
import pandas as pd
class DataManager(object):
def __init__(self):
self.a = None
self.b = None
self.c = None
self.d = None
self.e = None
self.f = None
def load_data(self):
self.a = pd.read_csv('a.csv')
self.b = pd.read_csv('b.csv')
self.c = pd.read_csv('c.csv')
self.d = pd.read_csv('d.csv')
self.e = pd.read_csv('e.csv')
self.f = pd.read_csv('f.csv')
if __name__ == '__main__':
dm = DataManager()
dm.load_data()
# Main thread is waiting for load_data to finish.
print("finished loading data")
Upvotes: 5
Views: 3213
Reputation: 348
I/O operations are not CPU bounded in most cases so using multiple processes is an overkill. Using multiple threads can be good, but pb.read_csv
not only reads the file but parses it what can be CPU bounded. I suggest you to read files from disk with asyncio as soon as it was initially made for this purpose. Here is the code to do so:
import asyncio
import aiofiles
async def read_file(file_name):
async with aiofiles.open(file_name, mode='rb') as f:
return await f.read()
def read_files_async(file_names: list) -> list:
loop = asyncio.get_event_loop()
return loop.run_until_complete(
asyncio.gather(*[read_file(file_name) for file_name in file_names]))
if __name__ == '__main__':
contents = read_files_async([f'files/file_{i}.csv' for i in range(10)])
print(contents)
The function read_files_async
returns the list of file contents (byte buffers), which you can pass to pd.read_csv
.
I think optimization of files reading only should be enough but you can parse files contents in parallel with multiple processes (threads and async won't increase performance of parsing process):
import multiprocessing as mp
NUMBER_OF_CORES = 4
pool = mp.Pool(NUMBER_OF_CORES)
pool.map(pb.read_csv, contents)
You should set NUMBER_OF_CORES
according to your machine spec.
Upvotes: 7
Reputation: 3801
Possible solution with Python3 ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import queue
import pandas as pd
def load_data_worker(data_queue, file_name):
data_queue.put(pd.read_csv(file_name))
class DataManager(object):
def __init__(self):
self.data_queue = queue.Queue()
self.data_arr = []
def load_data(self):
with ThreadPoolExecutor() as executor:
executor.submit(load_data_woker, self.data_queue, 'a.csv')
executor.submit(load_data_woker, self.data_queue, 'b.csv')
# ...
executor.submit(load_data_woker, self.data_queue, 'f.csv')
# dumping Queue of loaded data to array
self.data_arr = list(self.data_queue.queue)
if __name__ == '__main__':
dm = DataManager()
dm.load_data()
# Main thread is waiting for load_data to finish.
print("finished loading data")
Upvotes: 1