tf.antunes52
tf.antunes52

Reputation: 21

OutOfMemory while using Jupyter notebook with spark

I am currently using IBM Data Scientist Workbench with Jupyter notebooks and Spark.

I am trying to read several CSV files to a DF and then applying some transformations to it in order to create a final dataframe with merged data from the different CSV files, but for some reason I am getting this error:

Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132)

The code I am using is as follows:

i=0
count = 0
var_name = []

schema = StructType([])
df1 = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_ocurrences = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_count = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_merged = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_complete = sqlContext.createDataFrame(sc.emptyRDD(), schema)
FINAL = sqlContext.createDataFrame(sc.emptyRDD(), schema)

for file in os.listdir('/resources/data/test_variables/'):


    df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/resources/data/test_variables/"+file)

    #SKIP SERIES WITH ONLY 0s
    count = df1.groupBy().sum("Bit_value")

    if count.select("sum(Bit_value)").collect()[0][0] == 0:
        continue
    #

    i+=1

    # AGGREGATION
    df1 = df1.withColumn("Interval", ((df1.Timestamp.cast("long") / 1).cast("long") * 1).cast("timestamp")) 
    # COUNT 1s
    df1_ocurrences = df1.groupBy("Interval").sum("Bit_value").sort("Interval")
    df1_ocurrences = df1_ocurrences.withColumnRenamed("sum(Bit_value)", "Sum_df1")
    # COUNT TOTAL
    df1_count = df1.groupBy("Interval").count().sort("Interval")
    df1_count = df1_count.withColumnRenamed("count", "Total_df1")
    # MERGING
    df1_merged = df1_ocurrences.join(df1_count, ["Interval"]).sort("Interval")
    var_name = file.split(".")
    df1_complete = df1_merged.withColumn(var_name[0], df1_merged.Sum_df1 / df1_merged.Total_df1)
    df1_complete = df1_complete.drop('Sum_df1')
    df1_complete = df1_complete.drop('Total_df1')

    #FINAL DATAFRAME

    if i == 1:
        FINAL = df1_complete
    else:
        FINAL = FINAL.join(df1_complete, ["Interval"]).sort("Interval")

Any advice on this? Maybe I am not writing the most efficient code but I am new to Spark.

Upvotes: 2

Views: 1233

Answers (1)

Dimon Buzz
Dimon Buzz

Reputation: 1298

Too much time spent on GC and too little memory is freed up: https://developer.ibm.com/hadoop/2016/02/16/beginners-guide-apache-spark-troubleshooting/ In addition to recomendation in above article what worked for me in jyputer is this:

spark = SparkSession.builder \
    .appName("GBT Model") \
    .config("spark.executor.memory", "2000mb") \
    .master("local[*]") \
    .config("spark.executor.cores", "4") \
    .config("spark.yarn.executor.memoryOverhead",200) \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.default.parallelism", "4") \
    .getOrCreate()

Note spark.yarn.executor.memoryOverhead is set to 10% of executor memory.

Upvotes: 1

Related Questions