Parzival
Parzival

Reputation: 2064

Outer join in Python for thousands of large tables

So, I have some 4,000 CSV files and I need to outer join all of them. Each file has two columns (a string and a float) and between 10,000-1,000,000 rows and I want to join by the first column (i.e., the string variable).

I tried numpy.lib.recfunctions.join_by, but that was painfully slow. I switched to pandas.merge and that was a lot faster, but still too slow given the number (and size) of tables that I have. And it seems really memory intensive - to the point where the machine becomes unusable when the file being merged has hundreds of thousands of rows (I'm mostly using a MacBook Pro, 2.4GHz, 4GB).

So now I'm looking for alternatives - are there other potential solutions that I'm missing? What other outer join implementations exist for Python? Is there a paper/site somewhere that discusses and compares the time complexity of each implementation? Would it be more efficient if I simply had Python call, say, sqlite3, and then have sqlite3 do the join? Is the string key the issue? If I could use a numerical key instead, should it be any faster?

In case it helps give you a more concrete idea of what I'm trying to achieve, here is my code using pandas.merge:

import os
import pandas as pd

def load_and_merge(file_names, path_to_files, columns):
    '''
    seq, str, dict -> pandas.DataFrame
    '''
    output = pd.DataFrame(columns = ['mykey']) # initialize output DataFrame
    for file in file_names:

        # load new data
        new_data = pd.read_csv(path + file,
                               usecols = [col for col in columns.keys()],
                               dtype = columns,
                               names = ['mykey', file.replace('.csv', '')],
                               header = None)

        # merge with previous data
        output = pd.merge(output, new_data, on = 'mykey', how = 'outer')
        output = output.fillna(0) # kill NaNs

    return output

path = '/Users/username/data/'
files_list = [file for file in os.listdir(path) if '.csv' in file]
merged_table = load_and_merge(files_list, path, {0: 'S30', 1: 'float'})

(Mac OS X 10.6.8 & Python 2.7.5; Ubuntu 12.04 & Python 2.7.3)

Upvotes: 7

Views: 2423

Answers (2)

Parzival
Parzival

Reputation: 2064

Ok, here is a partial implementation of Jeff's approach (see his answer above), if I understood it correctly. I'm posting this in case anyone else is trying to do something similar. And also in case anyone can help improve or "prettify" this code (right now this is a long, ugly stream of code... I guess I should probably modularize it somehow.)

It's a partial implementation because I didn't parallelize the merges. I tried, using Python's multiprocessing module, but apparently I don't have enough computer memory for that - two simultaneous processes were enough to freeze everything (or maybe I just did something utterly idiotic - very possible, as I had never used multiprocessing before). But the rest is here: hierarchical merges and HDF5 (to store the intermediate files).

#!/usr/bin/env python

import os
import sys
import pandas as pd

def merge_csv(folder, cols_info):
    '''
    str, dict -> pandas.DataFrame

    This function outer joins all CSV files in the specified folder. It 
    joins them hierarchically and stores the intermediate files in an HDF5 
    container. The first parameter is the path to the folder and the second 
    parameter is a dictionary mapping each column to the corresponding data 
    type. You can only specify two columns.

    Example: 

    merge_csv('/Users/username/data/', {0: 'S30', 2: 'float'})

    Dependencies:

    - HDF5
    - PyTables
    - pandas
    '''

    # ensure that user is specifying only two columns
    if len(cols_info) != 2:
        sys.exit('Error: You can only specify two columns.')

    # ensure that path to input folder ends with '/'
    folder = folder + '/' if folder[-1] != '/' else folder

    # create HDF5 file to store intermediate files
    store = pd.HDFStore(folder + 'store.h5', mode = 'w')

    # load CSV files and write data to HDF5 file
    flist = [file for file in os.listdir(folder) if file[-4:] == '.csv']
    if len(flist) == 0:
        sys.exit('There are no CSV files in the specified folder.')
    for file in flist:
        case = file.replace('.csv', '')
        store[case] = pd.read_csv(folder + file, 
                                  usecols = [col for col in cols_info], 
                                  names = ['key', case], 
                                  dtype = cols_info)
        store.flush()

    # start merge routine
    flist = store.keys()
    counter = 0
    while len(flist) > 1:
        counter += 1

        # merge current set of files, two by two
        length = (len(flist) - 1) if (len(flist) % 2 == 1) else len(flist)
        for i in range(length)[0:length:2]:
            merged_data = pd.merge(store[flist[i]], store[flist[i + 1]], 
                                   on = 'key', 
                                   how = 'outer',
                                   sort = False)
            outputfile = 'file' + str(i) + str(i + 1)

            # if number of files is odd, make last pair a trio
            if (i == len(flist) - 3) and (len(flist) % 2 == 1):
                merged_data = pd.merge(merged_data, store[flist[i + 2]], 
                                       on = 'key',
                                       how = 'outer', 
                                       sort = False)
                outputfile += str(i + 2)

            # save merged pair (or trio) to HDF5 file
            merged_data = merged_data.fillna(0)
            store.put('/tmp' + str(counter) + '/' + outputfile, merged_data)
            store.flush()

        # clean up
        to_remove = [file for file in store.keys() 
                     if 'tmp' + str(counter) + '/' not in file]
        for file in to_remove:
            store.remove(file)

        # move on to next set of intermediate files
        flist = store.keys()

    # wrap up
    store.close()
    return merged_data

EDIT:

Still no good: the intermediate matrices eventually get too big and exceed the computer memory and the code crashes (tables.exceptions.HDF5ExtError: Problems creating the Array.). I've tried sqlite3, but that didn't work either, so I guess I'll just have to keep looking.

Upvotes: 0

Jeff
Jeff

Reputation: 129018

Here is how I would approach this problem.

Don't merge iteratively. You are merging a smallish frame (call this the 'mergee') with a larger frame (call this the 'merger'). Then repeating this, causing the 'merger' to get bigger and have more rows.

Instead you can do repeated hierarchical merges. Say you number the mergees 1-4000.

merge 1 and 2 to form 1_2

Then repeat, so then you merge 1_2 and 3_4 to form 1_2_3_4

This doesn't change the amount of work you are doing, but it makes each merge much simpler, which lowers the memory barrier and spent time (as it has to go thru the cartesian product of the keys). It may make sense to randomize the merge order.

In addition this is completely parallizeable, in that you can have independent processes work on this problem, generating intermediate merges. This essentially becomes a map-reduce type of problem.

You can also store the intermediate merges in HDF5 files (using HDFStore) which will make the storage quite efficient. Note that these should be SEPARATE files to avoid writing to the same file with multiple processes (which is not supported by HDF5).

Upvotes: 7

Related Questions