Reputation: 11
I have around 5000 CSV files which I have to open one by one, do some processing and close it.
For this instead of using multiprocessing code, I tried to replicate it using the open
function. So I can run this python script multiple times to take the advantage of multiprocessing in a way.
If I run the code lets say 5 times, in the beginning the files are saved very fast as 5 codes are running but as the time goes on, after around 50-60% of the files are processed, the complete process becomes very slow. Then it seems like running one code is even faster than 5 codes running simultaneously.
What can be the reason behind this?
import pandas as pd
import os
import datetime, time
files = os.listdir('E:\\Data\\stocks')
for i in files:
if i in os.listdir('E:\\Tanmay\\Data\\Zerodha another\\'):
continue
d = open('E:\\Tanmay\\Data\\Zerodha another\\' + i, 'a+')
d.close()
df = pd.read_csv('E:\\Data\\stocks\\' + i)
df.rename(columns={'Datetime': 'Date_time'}, inplace=True)
df['Date_time'] = df['Date_time'].str.split('+').str[0]
df['Date_time'] = pd.to_datetime(df['Date_time'], format='%Y-%m-%d %H:%M:%S')
df['Date'] = df['Date_time'].dt.date
df['Time'] = df['Date_time'].dt.time
df.to_csv('E:\\Tanmay\\Data\\Zerodha another\\' + i, index=False)
print(i)
Upvotes: 1
Views: 743
Reputation: 1
from typing import List
import aiofiles
import asyncio
import csv
import pandas as pd
import os
import datetime
import time
from pandas import DataFrame
files = os.listdir('E:\\Data\\stocks')
async def process_file(_df):
_df.rename(columns={'Datetime': 'Date_time'}, inplace=True)
_df['Date_time'] = _df['Date_time'].str.split('+').str[0]
_df['Date_time'] = pd.to_datetime(_df['Date_time'], format='%Y-%m-%d %H:%M:%S')
_df['Date'] = _df['Date_time'].dt.date
_df['Time'] = _df['Date_time'].dt.time
return _df
async def _is_valid(_file) -> bool:
"""checks if file can be opened meaning its a valid file """
async with aiofiles.open(_file, mode='r') as f:
# noinspection PyBroadException
try:
if await f.read():
return True
except Exception:
return False
async def read_and_process(_files: []) -> List[str]:
"""not completely asynchronous but much better than before"""
# creating a list comprehension of existing files to avoid having to check multiple times within the loop
_valid_files: List[str] = [file for file in files if await _is_valid(file)]
for _file in _valid_files:
processed_file = await process_file(pd.read_csv('E:\\Data\\stocks\\' + _file))
await processed_file.to_csv('E:\\Tanmay\\Data\\Zerodha another\\' + _file, index=False)
# returning the list of processed files so we can print the filenames once we are done rather than printing
# a file at a time -- IO Operations on a loop - slows down execution unless they can be done asynchronously
return _valid_files
if __name__ == '__main__':
files = asyncio.run(read_and_process(_files=files))
# NOTE: Printing the list once done increases the speed also
print(*files)
Upvotes: 1
Reputation: 39404
You should pre-process the files first to reduce the O(n^2)
effect of testing each successive file against an increasing number of files in the target folder:
targets = os.listdir('E:\\Tanmay\\Data\\Zerodha another\\')
files = [i for i in os.listdir('E:\\Data\\stocks') if i not in targets]
for i in files:
...
In the above code, files
becomes a list based on the files in the source folder that are not already present in the target folder. The target folder is just scanned once for all its filenames.
Upvotes: 0
Reputation: 23815
You need to go with something like the below
from multiprocessing import Pool
import os
def handle_csv(file_name):
# TODO - implement
pass
if __name__ == '__main__':
with Pool(5) as p: # TODO change 5 which is the number of processes
files_to_handle = os.listdir('E:\\Tanmay\\Data\\Zerodha another\\')
print(p.map(handle_csv, files_to_handle))
Upvotes: 1