Nacho
Nacho

Reputation: 832

Efficiently merging a large number of pyspark DataFrames

I'm trying to perform dataframe union of thousands of dataframes in a Python list. I'm using two approaches I found. The first one is by means of for loop union and the second one is using functools.reduce. Both them work well for toy examples, however for thousands of dataframes I'm experimenting a severe overhead, probably caused by code out of the JVM, sequentialy appending each dataframe at a time (using both merging approaches).

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

# The reduce approach
def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)

df_list = [td2, td3, td4, td5, td6, td7, td8, td9, td10]
df = unionAll(df_list)

#The loop approach
df = df_list[0].union(df_list[1])
for d in df_list[2:]:
    df = df.union(d)

The question is how to perform this multiple dataframe operation efficiently, probably circunventing the overhead caused by merging dataframes one-by-one.

Thank you very much

Upvotes: 2

Views: 2715

Answers (1)

cs95
cs95

Reputation: 403120

You are currently joining your DataFrames like this:

(((td1 + td2) + td3) + td4)

At each stage, you are concatenating a huge dataframe with a small dataframe, resulting in a copy at each step and a lot of wasted memory. I would suggest combining them like this:

(td1 + td2) + (td3 + td4)

The idea is to iteratively coalesce pairs of roughly the same size until you are left with a single result. Here is a prototype:

def pairwise_reduce(op, x):
    while len(x) > 1:
        v = [op(i, j) for i, j in zip(x[::2], x[1::2])]
        if len(x) > 1 and len(x) % 2 == 1:
            v[-1] = op(v[-1], x[-1])
        x = v
    return x[0]

result = pairwise_reduce(DataFrame.unionAll, df_list)

You will see how this makes a huge difference for python lists.

from functools import reduce 
from operator import add

x = [[1, 2, 3], [4, 5, 6], [7, 8], [9, 10, 11, 12]] * 1000

%timeit sum(x, [])
%timeit reduce(add, x)
%timeit pairwise_reduce(add, x)

64.2 ms ± 606 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
66.3 ms ± 679 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
970 µs ± 9.02 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

sum(x, []) == reduce(add, x) == pairwise_reduce(add, x)
# True

Upvotes: 8

Related Questions