machTucker
machTucker

Reputation: 55

Fastest way to read huge csv file, process then write processed csv in Python

I have a number of huge csv files (20GB ish) that I need to read, process then write the processed file back to a new csv.

The head of the csv file looks like this: enter image description here

The object of the task is to read the csv file and to compare the time from each line to see if it's within start and end times contained within a dictionary. If it isn't then the line is skipped, if it is then it is written to the new file.

Sounds very easy but due to the size being efficient is crucial and I need some advice.

I've tried a number of methods including trying to read the whole file in pandas which was taking a long time or crashing due to memory issues. I also tried opening and reading the file line by line then processing it but this also seemed to take a long time. My now line of attack is using dask but before I carry on I wanted to see if anyone can give me any hints as to improving speed on:

  1. Reading
  2. processing - this seems to take a long time as I'm using the apply function in dask to apply the processing function to every row. When I tried this it was taking something like 3 hours to process one file.
  3. Writing - Seemed to take a long time to write a single csv file of say 20GB. I tried using dask to output each partition then combining the partitions into one file which did seem to improve speed a bit.

What would be the plan of attack each stage above to produce the fastest results bearing in mind I have 16GB Ram?

This is the code that I came up with after reading your responses. I incorporated a bit of code to display the percentage complete. It appears that the % complete seems to slow down a lot when the file progresses, as if there is a memory issue or another problem. For a 20GB it was taking over 4 hours to process, which seems a very long time. I looked at the system resources during processing and CPU was sitting at about 16% and memory about 10GB of 16GB. Can someone tell me if this is the sort of speed I can expect or is there something wrong with the code?


from datetime import datetime, timedelta, timezone
from symbol_specification import darwinex_quote_hours, darwinex_symbol_specs
import os
import csv
import pandas as pd
import io
import tailer

# Software Variables and Information
__author__ = 'TBC'
__copyright__ = 'TBC'
__credits__ = ['TBC']
__license__ = 'GPL'
__version__ = '1.0.4dev'
__maintainer__ = 'TBC'
__email__ = 'TBC'
__status__ = 'Production'
software_name = 'TBC'


def get_digits(str_val):
    return len(str_val.split('.')[-1])


def get_in_filepath(dir_name, filename):
    return dir_name + r"\\" + filename


def get_out_filepath(dir_name, filename):
    return dir_name + r"\\" + filename.split('.csv')[0] + '_OOH_removed.csv'


def get_input_variables():

    dir_name = r"X:"

    # dictionary containing darwinex symbol id and csv filename
    run_details = {'CADCHF': 'CADCHF_mt5_ticks.csv',
                   'CADJPY': 'CADJPY_mt5_ticks.csv',
                   'CHFJPY': 'CHFJPY_mt5_ticks.csv',
                   'EURAUD': 'EURAUD_mt5_ticks.csv',
                   'EURCAD': 'EURCAD_mt5_ticks.csv',
                   'EURCHF': 'EURCHF_mt5_ticks.csv',
                   'EURGBP': 'EURGBP_mt5_ticks.csv',
                   'EURJPY': 'EURJPY_mt5_ticks.csv',
                   'EURNZD': 'EURNZD_mt5_ticks.csv'}

    # remove out of hours ticks?
    remove_out_of_hours_ticks = True

    # round OHLC values?
    round_ohlc_required = False

    # remove trailing zeros - ** Saves sapce in the output file
    remove_trailing_zeros = True

    # get quote hours as shown in MT5 symbol specification
    quote_hours = darwinex_quote_hours

    # specify the broker timezone in hours
    # ****************************IMPORTANT NOTE******************************
    # The Quote hours specified above relate to the broker time zone which maybe
    # different to the timezone tha the tickstory data was downloaded in.
    # Following my data download guidelines the tickstory data should be downloaded in
    # UTC+0):00 Dublin, Edinburgh, Lisbon, London timezone WITH DST. The Quote
    # hours specified in the MT5 specification are in the broker server time
    # which could be different. For example Darwinex is UTC +2.
    # Therefore the broker time offset is +2. The code will then subtract 2
    # hours to any of the quote times before using them.
    # ************************************************************************
    broker_time_offset = 2

    # create input dictionary
    input_vars = {
        'dir_name': dir_name,
        'run_details': run_details,
        'remove_out_of_hours_ticks': remove_out_of_hours_ticks,
        'remove_trailing_zeros': remove_trailing_zeros,
        'quote_hours': quote_hours,
        'quote_broker_time_offset': broker_time_offset,
        'round_ohlc_required': round_ohlc_required,
    }
    return input_vars


def round_ohlc(line, digits, input_vars):
    # assign vals
    date = line[0]
    time = line[1]
    bid = line[2]
    ask = line[3]
    last = line[4]
    vol = line[5]

    if digits != 0:
        bid = round(float(bid), digits)
        ask = round(float(ask), digits)
        last = round(float(last), digits)
    else:
        bid = int(round(float(bid), digits))
        ask = int(round(float(ask), digits))
        last = int(round(float(last), digits))

    # assemble line
    if input_vars['remove_trailing_zeros']:
        line = [date, time, f'{bid:.{digits}f}'.rstrip('0').rstrip('.'),
                f'{ask:.{digits}f}'.rstrip('0').rstrip('.'),
                f'{last:.{digits}f}'.rstrip('0').rstrip('.'), vol]
    else:
        line = [date, time, f'{bid:.{digits}f}', f'{ask:.{digits}f}',
                f'{last:.{digits}f}', vol]

    return line


def get_weekday_string(day_digit):
    weekdays = {0: 'Mon',
                1: 'Tues',
                2: 'Wed',
                3: 'Thurs',
                4: 'Fri',
                5: 'Sat',
                6: 'Sun'}

    return weekdays[day_digit]


def get_weekday_string(day_digit):
    weekdays = {0: 'Mon',
                1: 'Tues',
                2: 'Wed',
                3: 'Thurs',
                4: 'Fri',
                5: 'Sat',
                6: 'Sun'}

    return weekdays[day_digit]


def remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
    # get quote offset
    quote_offset = input_vars['quote_broker_time_offset']

    # adjust tick_datetime for offset
    tick_datetime_adj = tick_datetime + timedelta(hours=quote_offset)

    # get quote hours
    day_string = get_weekday_string(tick_datetime_adj.weekday())
    quote_hours = symbol_quote_hours[day_string]

    # initialse keep tick to False (remove)
    remove_tick = True

    # iterate through all quote start/end pairs and check to see if tick is in hours
    for idx in range(len(quote_hours['start'])):
        tick_time = tick_datetime_adj

        # get date of tick
        tick_date = tick_time.date()

        # form quote hours start time
        start_time = datetime.strptime(quote_hours['start'][idx], '%H:%M').time()

        # combine the date and quote start time to form datetime
        start = datetime.combine(tick_date, start_time)

        if quote_hours['end'][idx] == '24:00':
            # special case. 24:00 means to the end of the day but it's not
            # recognised by python as a valid datetime. To get around this the
            # day is incremented by 1 and a time of 00:00 used which is equivalent.

            # form quote hours end time
            end_time = datetime.strptime('00:00', '%H:%M').time()

            # combine the date and quote end time to form datetime
            end = datetime.combine(tick_date + timedelta(days=1), end_time)
        else:
            # form quote hours end time
            end_time = datetime.strptime(quote_hours['end'][idx], '%H:%M').time()

            # combine the date and quote end time to form datetime
            end = datetime.combine(tick_date, end_time)

        # check to see if tick is within quote hours
        if start <= tick_time <= end:
            remove_tick = False

    return remove_tick


def write_conversion_log(input_filename, output_filename,
                         mod_string, conversion_time, input_vars):
    # conversion log file name
    working_dir = input_vars['dir_name']
    filename = working_dir + '//tickstory_tick_pre_processor_conversions.log'

    # determine if file exists or not and assign appropriate opening flag
    if os.path.exists(filename):
        append_write = 'a'  # append if already exists
    else:
        append_write = 'w'  # make a new file if not

    with open(filename, append_write) as outfile:
        outfile.write('--------------------------------------------------------'
                      '-------------------------\n')
        outfile.write('Conversion Details\n')
        outfile.write(f'Software Name: {software_name}\n')
        outfile.write(f'Software Version: {__version__}\n')
        outfile.write(f'Date/Time: {datetime.now(timezone.utc)}\n')
        outfile.write(f'Input file = {input_filename}\n')
        outfile.write(f'{mod_string}\n')
        outfile.write(f'Output file = {output_filename}\n')
        outfile.write(f'Conversion Duration: {conversion_time}')
        outfile.write('--------------------------------------------------------'
                      '-------------------------')


def get_start_end_date(filename):
    # Get the first 3 rows of the file
    df_start = pd.read_csv(filepath_or_buffer=filename,
                           encoding='utf8',
                           nrows=3)

    # Add column names
    df_start.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']

    # create Datetime column
    df_start['Datetime'] = df_start['Date'].astype(str) + ' ' + df_start['Time'].astype(str)
    df_start['Datetime'] = pd.to_datetime(df_start['Datetime'], format='%Y%m%d %H:%M:%S')

    # Get the last 3 rows of the file
    with open(filename, 'r', encoding='utf8') as file:
        last_lines = tailer.tail(file, 3)

    # clean up last line for line feed carriage returns etc by checking if line has a string
    while last_lines[-1] == '':
        last_lines = last_lines[:-1]

    df_end = pd.read_csv(io.StringIO('\n'.join(last_lines[1:])), header=None)

    # Add column names
    df_end.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']

    # create Datetime column
    df_end['Datetime'] = df_end['Date'].astype(str) + ' ' + df_end['Time'].astype(str)
    df_end['Datetime'] = pd.to_datetime(df_end['Datetime'], format='%Y%m%d %H:%M:%S')

    # Add Start and End time to Symbol Info dictionary
    start_date = df_start['Datetime'][0]
    end_date = df_end['Datetime'][0]

    return start_date, end_date


def get_percentage_complete(start, end, tick_datetime):
    total_period = end - start
    period_complete = tick_datetime - start
    pct_complete = (period_complete / total_period) * 100

    return pct_complete


def main():
    # get input variables
    input_vars = get_input_variables()

    for darwinex_id, filename in input_vars['run_details'].items():

        # get filenames
        input_filename = get_in_filepath(input_vars['dir_name'], filename)
        output_filename = get_out_filepath(input_vars['dir_name'], filename)

        # get the start and end dates of the data so % copmlete can be determined
        start_date, end_date = get_start_end_date(input_filename)

        # get symbol quote hours
        symbol_quote_hours = darwinex_quote_hours[darwinex_id]

        # initialse list
        temp_list = []

        # read csv
        before_process = datetime.now()

        # initialise counters and mod string
        ticks_removed_count = 0
        mod_string = ''
        file_converted = False
        percentage_complete = 0

        # if processing requried open input and ouput files and process as requried
        if input_vars['remove_out_of_hours_ticks'] or input_vars['round_ohlc_required']:
            file_converted = True
            with open(input_filename, 'r', newline='') as f_in:
                with open(output_filename, 'w', newline='') as f_out:
                    # set up reader ad writer buffers
                    reader = csv.reader(f_in)
                    writer = csv.writer(f_out)
                    # for each line check whether datetime is within hours. If it is keep line, if not skip
                    for idx, line in enumerate(reader):
                        tick_datetime = datetime.strptime(line[0] + ' ' + line[1], '%Y%m%d %H:%M:%S')
                        if not remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
                            # keep line
                            # convert OHLC values if required
                            if input_vars['round_ohlc_required']:
                                digits_round = darwinex_symbol_specs[darwinex_id]['digits']
                                line = round_ohlc(line, digits_round, input_vars)
                            # write line to new file
                            writer.writerow(line)
                        else:
                            ticks_removed_count += 1
                        # determine and output the % complete after every 1000 lines
                        if idx % 1000 == 0:
                            percentage_complete_new = get_percentage_complete(start_date, end_date, tick_datetime)
                            if int(percentage_complete_new) - int(percentage_complete):
                                percentage_complete = int(percentage_complete_new)
                                print(f'{input_filename} % Complete: {percentage_complete:.0f}%', end='\r')

        # calculate conversion time
        after_process = datetime.now()
        conversion_time = after_process - before_process

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update tick removal modification string
        if input_vars['remove_out_of_hours_ticks']:
            mod_string = mod_string + (f'{newline}***Tick Removal***\n'
                                       f'{ticks_removed_count} ticks have been removed')
        else:
            mod_string = mod_string + f'{newline}***Tick removal NOT requested***'

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update rounding modification string
        if input_vars['round_ohlc_required']:
            mod_string = mod_string + (f'{newline}***OHLC values converted***\n'
                                       f'OHLC values rounded to {digits_round} digits')
            if input_vars['remove_trailing_zeros']:
                mod_string = mod_string + (f'\nOHLC values trailing zeros removed')
        else:
            mod_string = mod_string + (f'{newline}***OHLC value conversion NOT requested***')

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update case when no conversion is specified
        if not input_vars['remove_out_of_hours_ticks'] and not input_vars['round_ohlc_required']:
            mod_string = f'No Conversion performed as not requested in input parameters'

        # write conversion details to log file
        if file_converted:
            write_conversion_log(input_filename, output_filename,
                                 mod_string, conversion_time, input_vars)

            # print conversion details to the terminal
            print('----------------------------------------------------------------')
            print(f'Software Name: {software_name}')
            print(f'Software Version: {__version__}')
            print(f'Date/Time: {datetime.now(timezone.utc)}')
            print('Conversion Details')
            print(f'input file = {input_filename}')
            print(mod_string)
            print(f'output file = {output_filename}')
            print(f'Conversion Duration: {conversion_time}')
            print('----------------------------------------------------------------')
        else:
            print(f'input file = {input_filename} - {mod_string}')


if __name__ == '__main__':
    main()

Upvotes: 2

Views: 6778

Answers (2)

lezaf
lezaf

Reputation: 512

I am not sure for the specific details of your task, but it seems fairly simple to me. Since you haven't provided an MRE I will try with a pseudocode-like:

# Let input_filename be your 20gb .csv file
# Let output_filename be your filtered output file
with open(input_filename, 'r') as f_in, open(output_filename, 'w') as f_out:
    for line in f_in:
        # Abstract: with extract_time() you get the time information with any way
        line_time = extract_time(line)

        # Abstract: with satisfies_constraint you check the aforementioned dictionary
        if satisfies_constraint(dct, line_time):
            f_out.write(line)

This way you read the input file line-by-line in O(N), checking the dictionary in O(1) and you don't load the entire file in memory.

What is the issue with this simple way?

Update:

I updated my initial answer to be algorithmic-enough and not focusing on the exact way you will implement the individual aspects of whether you will split the line or read it with csv, etc. IMO the question concerns the basic procedure of completing the task.

Upvotes: 1

Zach Young
Zach Young

Reputation: 11223

A basic pattern for filtering a CSV looks like:

import csv

f_in = open("input.csv", newline="")
f_out = open("output.csv", "w", newline="")

reader = csv.reader(f_in)
writer = csv.writer(f_out)

writer.writerow(next(reader))  # transfer header, if you have one

for row in reader:
    if meets_my_condition(row):
        writer.writerow(row)

f_in.close()
f_out.close()

For simple row filtering, this is as fast and memory efficient as you can do it in Python: the reader is iterating a row at a time, so no more than one row in memory at a time; the file reads/writes are buffered, so the IO bottleneck is as low as your system and Python will allow. Any other framework/lib—Dask, etc...—will probably impose some performance overhead compared to this; and never use Pandas to iterate rows.

Based on my understanding of your screenshot and description, meets_my_condition needs to check if the value in column 2 is within a range of times; that could look like:

def meets_my_condition(row: list[str]) -> bool:
    return row[1] >= "00:00:00" and row[1] <= "12:00:00"

Given this input CSV:

Col1,Col2
a,01:00
b,10:00
c,13:00
d,07:00
e,23:00
f,00:00

I get this output CSV:

Col1,Col2
a,01:00
b,10:00
d,07:00
f,00:00

I've spec'd operations like this before, and memory barely exceeds Python's minimum footprint of 7-8MB on my laptop, so with 16GB you'll have more than enough headroom.

Upvotes: 4

Related Questions