user11103434
user11103434

Reputation:

EMR notebook session times out within seconds(using pyspark) on a large dataframe(pyspark)

I am trying to do some operations on a pyspark dataframe. The dataframe looks something like this:

    user    domain1    domain2 ........ domain100    conversions

    abcd      1          0     ........    0            1
    gcea      0          0     ........    1            0
     .        .          .     ........    .            .
     .        .          .     ........    .            .
     .        .          .     ........    .            .

The code I use works perfectly fine for me to further operate on the above dataframe if the dataframe is small, for example it works perfectly fine for a dataframe of the following shape:

    (148457,41)

But if I increase the size of the dataframe, to for example:

    (2184934,324)

I cannot proceed forward because the notebook times out or throws a session timeout error message as soon as i execute any kind of code on the dataframe, even something like a count() operation timesout. This is how the timeout message looks like:

    An error was encountered:
    Invalid status code '400' from 
    https://172.31.12.103:18888/sessions/5/statements/20 with error 
    payload: 
    "requirement failed: Session isn't active."

This timeout takes 1 or 2 seconds(does not take long time to timeout).

I am not using collect() or any topandas() operations for it to timeout. What I'm trying to do to the above dataframe is undersampling the data but I can't seem to make a simple .count() operation to work after the dataframe size is increased.

I have already tried using different types of instances in my emr cluster to make it work. When I use the smaller dataframe a c5.2xlarge type instance is enough, but for the larger dataframe it doesnt work even if I use c5.18xlarge instances. I have 1 master node and 2 slave nodes in my cluster.

This is what I'm trying to do to the dataframe

#Undersampling.
from pyspark.sql.functions import col, when
def resample(base_features,ratio,class_field,base_class):
    pos = base_features.filter(col(class_field)==base_class)
    neg = base_features.filter(col(class_field)!=base_class)
    total_pos = pos.count()
    total_neg = neg.count()
    fraction=float(total_pos*ratio)/float(total_neg)
    sampled = neg.sample(True,fraction)
    a = sampled.union(pos)
    return a
undersampled_df = resample(df,10,'conversions',1)

How can I solve this issue? Any suggestions on what steps I should take?

Upvotes: 5

Views: 1989

Answers (1)

Koba
Koba

Reputation: 1544

I had the same issue and for me increasing the driver memory through spark magic worked. By default the driver memory is 1000M when you create a spark application through JupyterHub. Just do

%%configure -f 
{"driverMemory": "6000M"}

It will restart the spark application and you can see the updated driver memory by doing

spark.sparkContext.getConf().get('spark.driver.memory')

Hope it helps.

Upvotes: 1

Related Questions