Reputation: 832
I'm trying to perform dataframe union of thousands of dataframes in a Python list. I'm using two approaches I found. The first one is by means of for loop union and the second one is using functools.reduce
. Both them work well for toy examples, however for thousands of dataframes I'm experimenting a severe overhead, probably caused by code out of the JVM, sequentialy appending each dataframe at a time (using both merging approaches).
from functools import reduce # For Python 3.x
from pyspark.sql import DataFrame
# The reduce approach
def unionAll(dfs):
return reduce(DataFrame.unionAll, dfs)
df_list = [td2, td3, td4, td5, td6, td7, td8, td9, td10]
df = unionAll(df_list)
#The loop approach
df = df_list[0].union(df_list[1])
for d in df_list[2:]:
df = df.union(d)
The question is how to perform this multiple dataframe operation efficiently, probably circunventing the overhead caused by merging dataframes one-by-one.
Thank you very much
Upvotes: 2
Views: 2715
Reputation: 403120
You are currently joining your DataFrames like this:
(((td1 + td2) + td3) + td4)
At each stage, you are concatenating a huge dataframe with a small dataframe, resulting in a copy at each step and a lot of wasted memory. I would suggest combining them like this:
(td1 + td2) + (td3 + td4)
The idea is to iteratively coalesce pairs of roughly the same size until you are left with a single result. Here is a prototype:
def pairwise_reduce(op, x):
while len(x) > 1:
v = [op(i, j) for i, j in zip(x[::2], x[1::2])]
if len(x) > 1 and len(x) % 2 == 1:
v[-1] = op(v[-1], x[-1])
x = v
return x[0]
result = pairwise_reduce(DataFrame.unionAll, df_list)
You will see how this makes a huge difference for python lists.
from functools import reduce
from operator import add
x = [[1, 2, 3], [4, 5, 6], [7, 8], [9, 10, 11, 12]] * 1000
%timeit sum(x, [])
%timeit reduce(add, x)
%timeit pairwise_reduce(add, x)
64.2 ms ± 606 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
66.3 ms ± 679 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
970 µs ± 9.02 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
sum(x, []) == reduce(add, x) == pairwise_reduce(add, x)
# True
Upvotes: 8