Reputation: 55
I have a number of huge csv files (20GB ish) that I need to read, process then write the processed file back to a new csv.
The head of the csv file looks like this:
The object of the task is to read the csv file and to compare the time from each line to see if it's within start and end times contained within a dictionary. If it isn't then the line is skipped, if it is then it is written to the new file.
Sounds very easy but due to the size being efficient is crucial and I need some advice.
I've tried a number of methods including trying to read the whole file in pandas which was taking a long time or crashing due to memory issues. I also tried opening and reading the file line by line then processing it but this also seemed to take a long time. My now line of attack is using dask but before I carry on I wanted to see if anyone can give me any hints as to improving speed on:
What would be the plan of attack each stage above to produce the fastest results bearing in mind I have 16GB Ram?
This is the code that I came up with after reading your responses. I incorporated a bit of code to display the percentage complete. It appears that the % complete seems to slow down a lot when the file progresses, as if there is a memory issue or another problem. For a 20GB it was taking over 4 hours to process, which seems a very long time. I looked at the system resources during processing and CPU was sitting at about 16% and memory about 10GB of 16GB. Can someone tell me if this is the sort of speed I can expect or is there something wrong with the code?
from datetime import datetime, timedelta, timezone
from symbol_specification import darwinex_quote_hours, darwinex_symbol_specs
import os
import csv
import pandas as pd
import io
import tailer
# Software Variables and Information
__author__ = 'TBC'
__copyright__ = 'TBC'
__credits__ = ['TBC']
__license__ = 'GPL'
__version__ = '1.0.4dev'
__maintainer__ = 'TBC'
__email__ = 'TBC'
__status__ = 'Production'
software_name = 'TBC'
def get_digits(str_val):
return len(str_val.split('.')[-1])
def get_in_filepath(dir_name, filename):
return dir_name + r"\\" + filename
def get_out_filepath(dir_name, filename):
return dir_name + r"\\" + filename.split('.csv')[0] + '_OOH_removed.csv'
def get_input_variables():
dir_name = r"X:"
# dictionary containing darwinex symbol id and csv filename
run_details = {'CADCHF': 'CADCHF_mt5_ticks.csv',
'CADJPY': 'CADJPY_mt5_ticks.csv',
'CHFJPY': 'CHFJPY_mt5_ticks.csv',
'EURAUD': 'EURAUD_mt5_ticks.csv',
'EURCAD': 'EURCAD_mt5_ticks.csv',
'EURCHF': 'EURCHF_mt5_ticks.csv',
'EURGBP': 'EURGBP_mt5_ticks.csv',
'EURJPY': 'EURJPY_mt5_ticks.csv',
'EURNZD': 'EURNZD_mt5_ticks.csv'}
# remove out of hours ticks?
remove_out_of_hours_ticks = True
# round OHLC values?
round_ohlc_required = False
# remove trailing zeros - ** Saves sapce in the output file
remove_trailing_zeros = True
# get quote hours as shown in MT5 symbol specification
quote_hours = darwinex_quote_hours
# specify the broker timezone in hours
# ****************************IMPORTANT NOTE******************************
# The Quote hours specified above relate to the broker time zone which maybe
# different to the timezone tha the tickstory data was downloaded in.
# Following my data download guidelines the tickstory data should be downloaded in
# UTC+0):00 Dublin, Edinburgh, Lisbon, London timezone WITH DST. The Quote
# hours specified in the MT5 specification are in the broker server time
# which could be different. For example Darwinex is UTC +2.
# Therefore the broker time offset is +2. The code will then subtract 2
# hours to any of the quote times before using them.
# ************************************************************************
broker_time_offset = 2
# create input dictionary
input_vars = {
'dir_name': dir_name,
'run_details': run_details,
'remove_out_of_hours_ticks': remove_out_of_hours_ticks,
'remove_trailing_zeros': remove_trailing_zeros,
'quote_hours': quote_hours,
'quote_broker_time_offset': broker_time_offset,
'round_ohlc_required': round_ohlc_required,
}
return input_vars
def round_ohlc(line, digits, input_vars):
# assign vals
date = line[0]
time = line[1]
bid = line[2]
ask = line[3]
last = line[4]
vol = line[5]
if digits != 0:
bid = round(float(bid), digits)
ask = round(float(ask), digits)
last = round(float(last), digits)
else:
bid = int(round(float(bid), digits))
ask = int(round(float(ask), digits))
last = int(round(float(last), digits))
# assemble line
if input_vars['remove_trailing_zeros']:
line = [date, time, f'{bid:.{digits}f}'.rstrip('0').rstrip('.'),
f'{ask:.{digits}f}'.rstrip('0').rstrip('.'),
f'{last:.{digits}f}'.rstrip('0').rstrip('.'), vol]
else:
line = [date, time, f'{bid:.{digits}f}', f'{ask:.{digits}f}',
f'{last:.{digits}f}', vol]
return line
def get_weekday_string(day_digit):
weekdays = {0: 'Mon',
1: 'Tues',
2: 'Wed',
3: 'Thurs',
4: 'Fri',
5: 'Sat',
6: 'Sun'}
return weekdays[day_digit]
def get_weekday_string(day_digit):
weekdays = {0: 'Mon',
1: 'Tues',
2: 'Wed',
3: 'Thurs',
4: 'Fri',
5: 'Sat',
6: 'Sun'}
return weekdays[day_digit]
def remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
# get quote offset
quote_offset = input_vars['quote_broker_time_offset']
# adjust tick_datetime for offset
tick_datetime_adj = tick_datetime + timedelta(hours=quote_offset)
# get quote hours
day_string = get_weekday_string(tick_datetime_adj.weekday())
quote_hours = symbol_quote_hours[day_string]
# initialse keep tick to False (remove)
remove_tick = True
# iterate through all quote start/end pairs and check to see if tick is in hours
for idx in range(len(quote_hours['start'])):
tick_time = tick_datetime_adj
# get date of tick
tick_date = tick_time.date()
# form quote hours start time
start_time = datetime.strptime(quote_hours['start'][idx], '%H:%M').time()
# combine the date and quote start time to form datetime
start = datetime.combine(tick_date, start_time)
if quote_hours['end'][idx] == '24:00':
# special case. 24:00 means to the end of the day but it's not
# recognised by python as a valid datetime. To get around this the
# day is incremented by 1 and a time of 00:00 used which is equivalent.
# form quote hours end time
end_time = datetime.strptime('00:00', '%H:%M').time()
# combine the date and quote end time to form datetime
end = datetime.combine(tick_date + timedelta(days=1), end_time)
else:
# form quote hours end time
end_time = datetime.strptime(quote_hours['end'][idx], '%H:%M').time()
# combine the date and quote end time to form datetime
end = datetime.combine(tick_date, end_time)
# check to see if tick is within quote hours
if start <= tick_time <= end:
remove_tick = False
return remove_tick
def write_conversion_log(input_filename, output_filename,
mod_string, conversion_time, input_vars):
# conversion log file name
working_dir = input_vars['dir_name']
filename = working_dir + '//tickstory_tick_pre_processor_conversions.log'
# determine if file exists or not and assign appropriate opening flag
if os.path.exists(filename):
append_write = 'a' # append if already exists
else:
append_write = 'w' # make a new file if not
with open(filename, append_write) as outfile:
outfile.write('--------------------------------------------------------'
'-------------------------\n')
outfile.write('Conversion Details\n')
outfile.write(f'Software Name: {software_name}\n')
outfile.write(f'Software Version: {__version__}\n')
outfile.write(f'Date/Time: {datetime.now(timezone.utc)}\n')
outfile.write(f'Input file = {input_filename}\n')
outfile.write(f'{mod_string}\n')
outfile.write(f'Output file = {output_filename}\n')
outfile.write(f'Conversion Duration: {conversion_time}')
outfile.write('--------------------------------------------------------'
'-------------------------')
def get_start_end_date(filename):
# Get the first 3 rows of the file
df_start = pd.read_csv(filepath_or_buffer=filename,
encoding='utf8',
nrows=3)
# Add column names
df_start.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']
# create Datetime column
df_start['Datetime'] = df_start['Date'].astype(str) + ' ' + df_start['Time'].astype(str)
df_start['Datetime'] = pd.to_datetime(df_start['Datetime'], format='%Y%m%d %H:%M:%S')
# Get the last 3 rows of the file
with open(filename, 'r', encoding='utf8') as file:
last_lines = tailer.tail(file, 3)
# clean up last line for line feed carriage returns etc by checking if line has a string
while last_lines[-1] == '':
last_lines = last_lines[:-1]
df_end = pd.read_csv(io.StringIO('\n'.join(last_lines[1:])), header=None)
# Add column names
df_end.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']
# create Datetime column
df_end['Datetime'] = df_end['Date'].astype(str) + ' ' + df_end['Time'].astype(str)
df_end['Datetime'] = pd.to_datetime(df_end['Datetime'], format='%Y%m%d %H:%M:%S')
# Add Start and End time to Symbol Info dictionary
start_date = df_start['Datetime'][0]
end_date = df_end['Datetime'][0]
return start_date, end_date
def get_percentage_complete(start, end, tick_datetime):
total_period = end - start
period_complete = tick_datetime - start
pct_complete = (period_complete / total_period) * 100
return pct_complete
def main():
# get input variables
input_vars = get_input_variables()
for darwinex_id, filename in input_vars['run_details'].items():
# get filenames
input_filename = get_in_filepath(input_vars['dir_name'], filename)
output_filename = get_out_filepath(input_vars['dir_name'], filename)
# get the start and end dates of the data so % copmlete can be determined
start_date, end_date = get_start_end_date(input_filename)
# get symbol quote hours
symbol_quote_hours = darwinex_quote_hours[darwinex_id]
# initialse list
temp_list = []
# read csv
before_process = datetime.now()
# initialise counters and mod string
ticks_removed_count = 0
mod_string = ''
file_converted = False
percentage_complete = 0
# if processing requried open input and ouput files and process as requried
if input_vars['remove_out_of_hours_ticks'] or input_vars['round_ohlc_required']:
file_converted = True
with open(input_filename, 'r', newline='') as f_in:
with open(output_filename, 'w', newline='') as f_out:
# set up reader ad writer buffers
reader = csv.reader(f_in)
writer = csv.writer(f_out)
# for each line check whether datetime is within hours. If it is keep line, if not skip
for idx, line in enumerate(reader):
tick_datetime = datetime.strptime(line[0] + ' ' + line[1], '%Y%m%d %H:%M:%S')
if not remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
# keep line
# convert OHLC values if required
if input_vars['round_ohlc_required']:
digits_round = darwinex_symbol_specs[darwinex_id]['digits']
line = round_ohlc(line, digits_round, input_vars)
# write line to new file
writer.writerow(line)
else:
ticks_removed_count += 1
# determine and output the % complete after every 1000 lines
if idx % 1000 == 0:
percentage_complete_new = get_percentage_complete(start_date, end_date, tick_datetime)
if int(percentage_complete_new) - int(percentage_complete):
percentage_complete = int(percentage_complete_new)
print(f'{input_filename} % Complete: {percentage_complete:.0f}%', end='\r')
# calculate conversion time
after_process = datetime.now()
conversion_time = after_process - before_process
if mod_string != '':
newline = '\n'
else:
newline = ''
# update tick removal modification string
if input_vars['remove_out_of_hours_ticks']:
mod_string = mod_string + (f'{newline}***Tick Removal***\n'
f'{ticks_removed_count} ticks have been removed')
else:
mod_string = mod_string + f'{newline}***Tick removal NOT requested***'
if mod_string != '':
newline = '\n'
else:
newline = ''
# update rounding modification string
if input_vars['round_ohlc_required']:
mod_string = mod_string + (f'{newline}***OHLC values converted***\n'
f'OHLC values rounded to {digits_round} digits')
if input_vars['remove_trailing_zeros']:
mod_string = mod_string + (f'\nOHLC values trailing zeros removed')
else:
mod_string = mod_string + (f'{newline}***OHLC value conversion NOT requested***')
if mod_string != '':
newline = '\n'
else:
newline = ''
# update case when no conversion is specified
if not input_vars['remove_out_of_hours_ticks'] and not input_vars['round_ohlc_required']:
mod_string = f'No Conversion performed as not requested in input parameters'
# write conversion details to log file
if file_converted:
write_conversion_log(input_filename, output_filename,
mod_string, conversion_time, input_vars)
# print conversion details to the terminal
print('----------------------------------------------------------------')
print(f'Software Name: {software_name}')
print(f'Software Version: {__version__}')
print(f'Date/Time: {datetime.now(timezone.utc)}')
print('Conversion Details')
print(f'input file = {input_filename}')
print(mod_string)
print(f'output file = {output_filename}')
print(f'Conversion Duration: {conversion_time}')
print('----------------------------------------------------------------')
else:
print(f'input file = {input_filename} - {mod_string}')
if __name__ == '__main__':
main()
Upvotes: 2
Views: 6778
Reputation: 512
I am not sure for the specific details of your task, but it seems fairly simple to me. Since you haven't provided an MRE I will try with a pseudocode-like:
# Let input_filename be your 20gb .csv file
# Let output_filename be your filtered output file
with open(input_filename, 'r') as f_in, open(output_filename, 'w') as f_out:
for line in f_in:
# Abstract: with extract_time() you get the time information with any way
line_time = extract_time(line)
# Abstract: with satisfies_constraint you check the aforementioned dictionary
if satisfies_constraint(dct, line_time):
f_out.write(line)
This way you read the input file line-by-line in O(N)
, checking the dictionary in O(1)
and you don't load the entire file in memory.
What is the issue with this simple way?
I updated my initial answer to be algorithmic-enough and not focusing on the exact way you will implement the individual aspects of whether you will split
the line or read it with csv
, etc. IMO the question concerns the basic procedure of completing the task.
Upvotes: 1
Reputation: 11223
A basic pattern for filtering a CSV looks like:
import csv
f_in = open("input.csv", newline="")
f_out = open("output.csv", "w", newline="")
reader = csv.reader(f_in)
writer = csv.writer(f_out)
writer.writerow(next(reader)) # transfer header, if you have one
for row in reader:
if meets_my_condition(row):
writer.writerow(row)
f_in.close()
f_out.close()
For simple row filtering, this is as fast and memory efficient as you can do it in Python: the reader is iterating a row at a time, so no more than one row in memory at a time; the file reads/writes are buffered, so the IO bottleneck is as low as your system and Python will allow. Any other framework/lib—Dask, etc...—will probably impose some performance overhead compared to this; and never use Pandas to iterate rows.
Based on my understanding of your screenshot and description, meets_my_condition
needs to check if the value in column 2 is within a range of times; that could look like:
def meets_my_condition(row: list[str]) -> bool:
return row[1] >= "00:00:00" and row[1] <= "12:00:00"
Given this input CSV:
Col1,Col2
a,01:00
b,10:00
c,13:00
d,07:00
e,23:00
f,00:00
I get this output CSV:
Col1,Col2
a,01:00
b,10:00
d,07:00
f,00:00
I've spec'd operations like this before, and memory barely exceeds Python's minimum footprint of 7-8MB on my laptop, so with 16GB you'll have more than enough headroom.
Upvotes: 4