Chris O'Kelly
Chris O'Kelly

Reputation: 1893

Parallelize/vectorize computation of combinations from Pandas Dataframe

I have a number of pickled pandas dataframes with a decent number of rows in each (~10k). One of the columns of the dataframe is a numpy ndarray of floats (Yes, I specifically chose to store array data inside a single cell - I've read this may not usually be the right way to go, eg. here , but in this case the individual values are meaningless, only the full list of values has meaning taken together, so I think it makes sense in this case). I need to calculate the euclidean distance between each pair of rows in the frame. I have working code for this, but I am hoping I can do something to improve the performance of it, as right now it is telling me that my smaller dataset is going to take > a month, but I'm pretty sure it's gonna take all my memory long before then.

The code is as follows:

import pandas as pd
import sys
import getopt
import math
from scipy.spatial import distance
from timeit import default_timer as timer
from datetime import timedelta

id_column_1 = 'id1'
id_column_2 = 'id2'
distance_column = 'distance'
val_column = 'val'

# where n is the size of the set
# and k is the number of elements per combination
def combination_count(n, k):
    if k > n:
        return 0
    else:
        # n! / (k! * (n - k)!)
        return math.factorial(n)/(math.factorial(k) * math.factorial(n - k))

def progress(start, current, total, id1, id2):
    if current == 0:
        print('Processing combination #%d of #%d, (%d, %d)' % (current, total, id1, id2))
    else:
        percent_complete = 100 * float(current)/float(total)
        elapsed_time = timer() - start
        avg_time = elapsed_time / current
        remaining = total - current
        remaining_time = timedelta(seconds=remaining * avg_time)
        print('Processing combination #%d of #%d, (%d, %d). %.2f%% complete, ~%.2f s/combination, ~%s remaining' % (current, total, id1, id2, percent_complete, avg_time, remaining_time))

def check_distances(df):
    indexes = df.index
    total_combinations = combination_count(len(indexes), 2)
    current_combination = 0
    print('There are %d possible inter-message relationships to compute' % total_combinations)
    distances = pd.DataFrame(columns=[id_column_1, id_column_2, distance_column])
    distances.set_index([id_column_1, id_column_2], inplace=True)
    start = timer()
    for id1 in indexes:
        for id2 in indexes:
            # id1 is always < id2
            if id1 >= id2:
                continue
            progress(start, current_combination, total_combinations, id1, id2)
            distances.loc[(id1, id2), distance_column] = distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])
            current_combination+=1

(I excluded the main() function which just pulls out args and loads in the pickled files based on them)

I've only really started working with Python recently for this task, so there's every chance I'm missing something simple, is there a good way to deal with this?

Upvotes: 0

Views: 566

Answers (2)

Chris O&#39;Kelly
Chris O&#39;Kelly

Reputation: 1893

So the solution ended up being parallelization, but I was unable to figure this out using the Panda specific parallelization libs seeing as the intended result was not a transformation of the existing cell contents, but a new value derived from another dataframe.

I grabbed the joblib library and took the following steps:

first, I created a function that, given two ids, could return the row for that index (as the separate workers cannot mutate the dataframe in the main process, we instead have to move to a paradigm of generating all the data first, THEN building the dataframe):

def get_distance(df, id1, id2):
    return [id1, id2, distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])]

and applied joblib parallelization to it:

def get_distances(df):
    indexes = df.index
    total_combinations = combination_count(len(indexes), 2)
    current_combination = 0
    print('There are %d possible inter-message relationships to compute' % total_combinations)
    data = Parallel(n_jobs=-1)(delayed(get_distance)(df, min(ids), max(ids)) for ids in combinations(indexes, 2))
    distances = pd.DataFrame(data, columns=[id_column_1, id_column_2, distance_column])
    distances.set_index([id_column_1, id_column_2], inplace=True)
    return distances

This gave an immediate improvement from months to days for the expected time, but I suspected that passing the full dataframe was going to be creating significant overhead.

After modifying the function to pass in only the required values, another immediate improvement to less than a day (~20 hours) was attained:

def get_distance(id1, id2, embed1, embed2):
    return [id1, id2, distance.euclidean(embed1, embed2)]

# ...later, in get_distances()...

data = Parallel(n_jobs=-1)(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in combinations(indexes, 2))

Finally, based on joblib's docs and the fact that a significant amount of data is still being transferred to the workers, I swapped to the multiprocessing backend, and saw the expected time drop further to ~1.5 hours. (I also added the tqdm lib so I could get a nicer idea of progress than what joblib provides)

data = Parallel(n_jobs=-1, backend='multiprocessing')(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in tqdm(combinations(indexes, 2), total=total_combinations))

Hopefully this helps someone else with their first foray into Python parallelization!

Upvotes: 1

edinho
edinho

Reputation: 406

There are some options for parallel calculation of dataframes in pure python.
Most complete may be dask
A simpler, but easier option would be pandaral-lel

Upvotes: 1

Related Questions