Reputation: 175
I am trying to find a cause of slow CSV reading.
I tried multiple approaches, I have 8GB csv file, after processing it is around 6GB with 10 columns.
What I was thinking is, reading the file with one thread and then processing it in another one, so I do not use any bandwith. basically apporach I found in another stack overflow thread.
The speed now is around 1112 second for reading the file only! Which equals to around 7MB/s. I can get reading speed on this drive through SQL to around 380 MB/s, so there must be some bottleneck or some other thing.
I am not worried about processing or some other stuff. I am just interested in reading the file into memory ASAP and then processing it. There probably some issue with my code, because pandas is way way faster (althoug not anyway near the disk speed), see at the bottom.
Maybe thats how it is, but I am not happy with that.
import os, csv, time, math
from queue import Queue
from threading import Thread
file = r'local_disk_file.csv'
out = r'network_location'
_sentinel = object()
def convert10(x10, y10):
# some processing
return gridcellid10
def read_file(file, q):
start = time.monotonic()
with open(file, 'r', newline='') as inFile:
next(inFile)
for row in inFile:
q.put(row)
q.put(_sentinel)
print('File read in {}s'.format(time.monotonic()-start))
def post_process(in_q):
with open(os.path.join(out, "output_on_network.csv"), 'w', newline='') as outFile:
writer = csv.writer(outFile)
row = ['AreaID', 'CellID', 'Case1', 'Case2', 'Case3', 'Case4', 'Case5', 'Case6', 'Case7', 'Case8']
writer.writerow(row)
for row in iter(in_q.get, _sentinel):
reader = csv.reader([row])
for row in reader:
cellid = convert10(int(row[1]), int(row[2]))
final_row = [row[0], cellid]
switch = False
for item in row[6:]:
if int(item) > 15000:
switch = True
print('Broken row, skipping')
print('\t' + ' '.join(row))
final_row.extend(row[6:])
if not switch:
writer.writerow(final_row)
def main():
q = Queue()
t1 = Thread(target=read_file, args=(file, q))
t2 = Thread(target=post_process, args=(q,))
t1.start()
t2.start()
if __name__ == '__main__':
main()
I tried to use pandas and that is way faster. The below code takes around 92 seconds, which equals to roughly 81 MB/s.
import pandas as pd, time
file = r'local_disk_file.csv'
start = time.monotonic()
df = pd.read_csv(file)
print(time.monotonic()-start)
Edit: I also tried just reading and doing nothing with the file. That takes 45s which equals to 177 MB/s, which I am happy with.
import time
file = r'local_disk_fileL.csv'
start = time.monotonic()
with open(file, 'r', newline='') as in_file:
for row in in_file:
pass
print(time.monotonic()-start)
Upvotes: 0
Views: 6395
Reputation: 175
So the best option for me is reading it through pandas and then apply parallel processing. This way I am able to achieve reading speed of 65 MB/s. It is not anywhere near max speed of the drive. But it considerably speeds up my problem.
Also a good option is to read csv, save is as parquet file and work with that. That way, we can save a lot of space and the read speed is very fast, because it is reading the file in parallel, the more columns the better speed!
Also if we read the csv, process it and save it as parquet, I can get a very big speed up.
With conversion to parquet and processing it an writing it back as parquet file, I can process the whole file in around 140s. If I read the csv with pandas,the reading only takes the same amount of time and another bottleneck is writing the file back to disk.
This means for me, stop using the csv files!
import time, math, pandas as pd
import multiprocessing as mp
import pyarrow.parquet as pq
file = r'in_parquet.pqt'
out = r'out_parquet.pqt'
def gridcellid(x, y, xmin, ymin, xshift, yshift, m, n, r, mtidshift):
## some processing
return gridcellid
def read(file):
start = time.monotonic()
df = pd.read_parquet(file, engine = 'pyarrow')
print(f'File read in {time.monotonic()-start}s')
return df
def calculate(df):
df['CellID'] = 0
df['CellID'] = [gridcellid(x, y, 97170, 274320, 0, 0, 0, 6, 10, 0) for x, y in zip(df['x'], df['y'])]
cols = ['Domain', 'CellID', 'RP00005', 'RP00010', 'RP00050', 'RP00100', 'RP00200', 'RP00500', 'RP01000', 'RP10000']
df.drop(columns = ['uwPop', 'uwInd', 'a01_5dPC', 'x', 'y'])
df = df.reindex(columns=cols)
df.rename({"Domain": "AreaID", "RP00005": "Case1",
"RP00010": "Case2", "RP00050": "Case3", "RP00100": "Case4",
"RP00200": "Case5", "RP00500": "Case6", "RP01000" : "Case7", "RP10000" : "Case8"}, inplace=True)
return df
def parallelize_dataframe(df, func, n_cores=16):
n = 100000
df_split = [df[i:i+n] for i in range(0,df.shape[0],n)]
pool = mp.Pool(n_cores)
li_df = []
for i in pool.imap(func, df_split):
li_df.append(i)
df = pd.concat(li_df)
pool.close()
pool.join()
return df
if __name__ == '__main__':
start = time.monotonic()
df_input = read(file)
df_merged = parallelize_dataframe(df_input, calculate)
df_merged.to_parquet(out, engine = 'pyarrow', index = False)
print(f'File completely processed in {time.monotonic()-start}s')
Upvotes: 2