Reputation: 21
I am currently using IBM Data Scientist Workbench with Jupyter notebooks and Spark.
I am trying to read several CSV files to a DF and then applying some transformations to it in order to create a final dataframe with merged data from the different CSV files, but for some reason I am getting this error:
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132)
The code I am using is as follows:
i=0
count = 0
var_name = []
schema = StructType([])
df1 = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_ocurrences = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_count = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_merged = sqlContext.createDataFrame(sc.emptyRDD(), schema)
df1_complete = sqlContext.createDataFrame(sc.emptyRDD(), schema)
FINAL = sqlContext.createDataFrame(sc.emptyRDD(), schema)
for file in os.listdir('/resources/data/test_variables/'):
df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/resources/data/test_variables/"+file)
#SKIP SERIES WITH ONLY 0s
count = df1.groupBy().sum("Bit_value")
if count.select("sum(Bit_value)").collect()[0][0] == 0:
continue
#
i+=1
# AGGREGATION
df1 = df1.withColumn("Interval", ((df1.Timestamp.cast("long") / 1).cast("long") * 1).cast("timestamp"))
# COUNT 1s
df1_ocurrences = df1.groupBy("Interval").sum("Bit_value").sort("Interval")
df1_ocurrences = df1_ocurrences.withColumnRenamed("sum(Bit_value)", "Sum_df1")
# COUNT TOTAL
df1_count = df1.groupBy("Interval").count().sort("Interval")
df1_count = df1_count.withColumnRenamed("count", "Total_df1")
# MERGING
df1_merged = df1_ocurrences.join(df1_count, ["Interval"]).sort("Interval")
var_name = file.split(".")
df1_complete = df1_merged.withColumn(var_name[0], df1_merged.Sum_df1 / df1_merged.Total_df1)
df1_complete = df1_complete.drop('Sum_df1')
df1_complete = df1_complete.drop('Total_df1')
#FINAL DATAFRAME
if i == 1:
FINAL = df1_complete
else:
FINAL = FINAL.join(df1_complete, ["Interval"]).sort("Interval")
Any advice on this? Maybe I am not writing the most efficient code but I am new to Spark.
Upvotes: 2
Views: 1233
Reputation: 1298
Too much time spent on GC and too little memory is freed up: https://developer.ibm.com/hadoop/2016/02/16/beginners-guide-apache-spark-troubleshooting/ In addition to recomendation in above article what worked for me in jyputer is this:
spark = SparkSession.builder \
.appName("GBT Model") \
.config("spark.executor.memory", "2000mb") \
.master("local[*]") \
.config("spark.executor.cores", "4") \
.config("spark.yarn.executor.memoryOverhead",200) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.default.parallelism", "4") \
.getOrCreate()
Note spark.yarn.executor.memoryOverhead is set to 10% of executor memory.
Upvotes: 1