Reputation:
I am trying to do some operations on a pyspark dataframe. The dataframe looks something like this:
user domain1 domain2 ........ domain100 conversions
abcd 1 0 ........ 0 1
gcea 0 0 ........ 1 0
. . . ........ . .
. . . ........ . .
. . . ........ . .
The code I use works perfectly fine for me to further operate on the above dataframe if the dataframe is small, for example it works perfectly fine for a dataframe of the following shape:
(148457,41)
But if I increase the size of the dataframe, to for example:
(2184934,324)
I cannot proceed forward because the notebook times out or throws a session timeout error message as soon as i execute any kind of code on the dataframe, even something like a count() operation timesout. This is how the timeout message looks like:
An error was encountered:
Invalid status code '400' from
https://172.31.12.103:18888/sessions/5/statements/20 with error
payload:
"requirement failed: Session isn't active."
This timeout takes 1 or 2 seconds(does not take long time to timeout).
I am not using collect() or any topandas() operations for it to timeout. What I'm trying to do to the above dataframe is undersampling the data but I can't seem to make a simple .count() operation to work after the dataframe size is increased.
I have already tried using different types of instances in my emr cluster to make it work. When I use the smaller dataframe a c5.2xlarge type instance is enough, but for the larger dataframe it doesnt work even if I use c5.18xlarge instances. I have 1 master node and 2 slave nodes in my cluster.
This is what I'm trying to do to the dataframe
#Undersampling.
from pyspark.sql.functions import col, when
def resample(base_features,ratio,class_field,base_class):
pos = base_features.filter(col(class_field)==base_class)
neg = base_features.filter(col(class_field)!=base_class)
total_pos = pos.count()
total_neg = neg.count()
fraction=float(total_pos*ratio)/float(total_neg)
sampled = neg.sample(True,fraction)
a = sampled.union(pos)
return a
undersampled_df = resample(df,10,'conversions',1)
How can I solve this issue? Any suggestions on what steps I should take?
Upvotes: 5
Views: 1989
Reputation: 1544
I had the same issue and for me increasing the driver memory through spark magic worked. By default the driver memory is 1000M
when you create a spark application through JupyterHub. Just do
%%configure -f
{"driverMemory": "6000M"}
It will restart the spark application and you can see the updated driver memory by doing
spark.sparkContext.getConf().get('spark.driver.memory')
Hope it helps.
Upvotes: 1