Siddarth
Siddarth

Reputation: 1020

Improving Efficiency of Reading a Large CSV File

I am using rake (Rapid automatics keyword extraction algo) to generate keywords. I have around 53million records, ~ 4.6gb. I want to know the best possible way to do this.

I have rake nicely wrapped up in a class. I have a 4.5gb file which consists of 53 million records. Below are some of the approaches.

Approach #1:

with open("~inputfile.csv") as fd:
   for line in fd:
      keywords = rake.run(line)
      write(keywords)

This is a basic brute force way. Assuming that writing to file takes time, invoking it 53 million times would be costly. I used the below approach, writing 100K lines to file at one go.

Approach #2

with open("~inputfile.csv") as fd:
temp_string = ''
counter = 0
   for line in fd:
      keywords = rake.run(line)
      string = string + keywords + '\n'
      counter += 1
      if counter == 100000:
           write(string)
           string = ''

To my surprise Approach #2 took more time than approach #1. I don't get it! How is that possible? Also can you guys suggest a better approach?

Approach #3 (thanks to cefstat)

with open("~inputfile.csv") as fd:
  strings = []
  counter = 0
  for line in fd:
    strings.append(rake.run(line))
    counter += 1
    if counter == 100000:
      write("\n".join(strings))
      write("\n")
      strings = []

Runs faster than Approaches #1 & #2.

Thanks in advance!

Upvotes: 4

Views: 869

Answers (2)

skrrgwasme
skrrgwasme

Reputation: 9633

As mentioned in the comments, Python already buffers writing to files, so you implementing your own in Python (as opposed to C, like it already is) is going to make it slower. You can adjust the buffer size with arguments to your call to open.

A different approach would be to read the file in chunks instead. The basic algorithm would be this:

  1. Iterate over the file using file.seek(x) where x = current position + desired chunk size
  2. While iterating, record the start end end byte positions of each chunk
  3. In a worker processing (using multiprocessing.Pool()), read in the chunk using start and end byte positions
  4. Each process writes to its own keyword file

  5. Reconcile the seperate files. You have a few options for doing this:

    • Read the keyword files back into memory into a single list
    • If on *nix, combine keyword files using the "cat" command.
    • If you're on Windows, you could keep a list of the keyword files (instead of one file path) and iterate through those files as needed

There are numerous blogs and recipes out there for reading large files in parallel:

https://stackoverflow.com/a/8717312/2615940
http://aamirhussain.com/2013/10/02/parsing-large-csv-files-in-python/
http://www.ngcrawford.com/2012/03/29/python-multiprocessing-large-files/
http://effbot.org/zone/wide-finder.htm

Side note: I once tried doing the same thing and had the same result. It also doesn't help to outsource the file writing to another thread (at least it didn't when I tried).

Here's a code snippet that demonstrates the algorithm:

import functools
import multiprocessing

BYTES_PER_MB = 1048576

# stand-in for whatever processing you need to do on each line
# for demonstration, we'll just grab the first character of every non-empty line
def line_processor(line):
    try:
        return line[0]
    except IndexError:
        return None

# here's your worker function that executes in a worker process
def parser(file_name, start, end):

    with open(file_name) as infile:

        # get to proper starting position
        infile.seek(start)

        # use read() to force exactly the number of bytes we want
        lines = infile.read(end - start).split("\n")

    return [line_processor(line) for line in lines]

# this function splits the file into chunks and returns the start and end byte
# positions of each chunk
def chunk_file(file_name):

    chunk_start = 0
    chunk_size = 512 * BYTES_PER_MB # 512 MB chunk size

    with open(file_name) as infile:

        # we can't use the 'for line in infile' construct because fi.tell()
        # is not accurate during that kind of iteration

        while True:
            # move chunk end to the end of this chunk
            chunk_end = chunk_start + chunk_size
            infile.seek(chunk_end)

            # reading a line will advance the FP to the end of the line so that
            # chunks don't break lines
            line = infile.readline()

            # check to see if we've read past the end of the file
            if line == '':
                yield (chunk_start, chunk_end)
                break

            # adjust chunk end to ensure it didn't break a line
            chunk_end = infile.tell()

            yield (chunk_start, chunk_end)

            # move starting point to the beginning of the new chunk
            chunk_start = chunk_end

    return

if __name__ == "__main__":

    pool = multiprocessing.Pool()

    keywords = []

    file_name = # enter your file name here

    # bind the file name argument to the parsing function so we dont' have to
    # explicitly pass it every time
    new_parser = functools.partial(parser, file_name)

    # chunk out the file and launch the subprocesses in one step
    for keyword_list in pool.starmap(new_parser, chunk_file(file_name)):

        # as each list is available, extend the keyword list with the new one
        # there are definitely faster ways to do this - have a look at 
        # itertools.chain() for other ways to iterate over or combine your
        # keyword lists
        keywords.extend(keyword_list) 

    # now do whatever you need to do with your list of keywords

Upvotes: 3

cefstat
cefstat

Reputation: 2406

Repeatedly adding strings is very slow in Python (as mentioned by jedwards). You can try the following standard alternative which should almost certainly be faster than #2 and which in my limited testing appears to be 30% faster than approach #1 (although perhaps not fast enough for your needs):

with open("~inputfile.csv") as fd:
  strings = []
  counter = 0
  for line in fd:
    strings.append(rake.run(line))
    counter += 1
    if counter == 100000:
      write("\n".join(strings))
      write("\n")
      strings = []

Upvotes: 3

Related Questions