Reputation: 503
I'm trying to optimize one piece of Software written in Python using Pandas DF . The algorithm takes a pandas DF as input, can't be distributed and it outputs a metric for each client.
Maybe it's not the best solution, but my time-efficient approach is to load all files in parallel and then build a DF for each client
This works fine BUT very few clients have really HUGE amount of data. So I need to save memory when creating their DF.
In order to do this I'm performing a groupBy() (actually a combineByKey, but logically it's a groupBy) and then for each group (aka now a single Row of an RDD) I build a list and from it, a pandas DF.
However this makes many copies of the data (RDD rows, List and pandas DF...) in a single task/node and crashes and I would like to remove that many copies in a single node.
I was thinking on a "special" combineByKey with the following pseudo-code:
def createCombiner(val):
return [val]
def mergeCombinerVal(x,val):
x.append(val);
return x;
def mergeCombiners(x,y):
#Not checking if y is a pandas DF already, but we can do it too
if (x is a list):
pandasDF= pd.Dataframe(data=x,columns=myCols);
pandasDF.append(y);
return pandasDF
else:
x.append(y);
return x;
My question here, docs say nothing, but someone knows if it's safe to assume that this will work? (return dataType of merging two combiners is not the same than the combiner). I can control datatype on mergeCombinerVal too if the amount of "bad" calls is marginal, but it would be very inefficient to append to a pandas DF row by row.
Any better idea to perform want i want to do?
Thanks!,
PS: Right now I'm packing Spark rows, would switching from Spark rows to python lists without column names help reducing memory usage?
Upvotes: 0
Views: 169
Reputation: 503
Just writing my comment as answer
At the end I've used regular combineByKey, its faster than groupByKey (idk the exact reason, I guess it helps packing the rows, because my rows are small size, but there are maaaany rows), and also allows me to group them into a "real" Python List (groupByKey groups into some kind of Iterable which Pandas doesn't support and forces me to create another copy of the structure, which doubles memory usage and crashes), which helps me with memory management when packing them into Pandas/C datatypes.
Now I can use those lists to build a dataframe directly without any extra transformation (I don't know what structure is Spark's groupByKey "list", but pandas won't accept it in the constructor).
Still, my original idea should have given a little less memory usage (at most 1x DF + 0.5x list, while now I have 1x DF + 1x list), but as user8371915 said it's not guaranteed by the API/docs..., better not to put that into production :)
For now, my biggest clients fit into a reasonable amount of memory. I'm processing most of my clients in a very parallel low-memory-per-executor job and the biggest ones in a not-so-parallel high-memory-per-executor job. I decide based on a pre-count I perform
Upvotes: 1