gunj_desai
gunj_desai

Reputation: 782

Spark UI stuck while attempting to create Dynamic Dataframes

I am using Spark (2.2.0) with ElasticSeach Hadoop (7.6.0) The purpose of my Spark Job is process records from a parquet file, and append it by unique to documents already present in ElasticSearch. Since ElasticSearch doesn't support updates, the part of fetching the records and updating them is handled by the job.

I have approximately 20 million records in the index. At any point in time, I dont need all the records, hence I use a filter pushdown to fetch only the needed number of documents.

For perfomance reasons, the max number of record terms you can pushdown is 65536. I raised to 100K but didnt move it further because the average number of records fetched are between 2-3 million.

So the aim was to create dataframes which fetch 100K records per request and combine them using union

My code for the part is as follows

val df = sparkSession
      .sqlContext
      .read
      .format("es")
      .load(index)

val CURSOR_SIZE = 100000
val cursors = filter._2.grouped(CURSOR_SIZE)
    cursors.map(cursor => df.filter($"${filter._1}".isin(cursor:_*))).reduce(_ union _)

With the above code, the Spark UI gets stuck with no tasks launched after the collect() function is completed, up until I get an OOM error. Spark UI Stuck

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
    at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
    at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
    at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1883)
    at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1792)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1190)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:136)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)

In an attempt to parallelise using RDD, I've tried the following:

session
 .sparkContext
 .parallelize(cursors)
 .map(cursor => df.filter($"${filter._1}".isin(cursor:_*)))
 .reduce(_ union _)

which throws a NullPointerException

I understand the problem with the second appraoch, since DataFrame and RDD are abstract Driver concepts, hence executors can't operate on them.

But after trying all of this, i am out of ideas, on what else to try. I would really appreciate if someone can point me in the right direction.

Thank you !!

UPDATE: Updated Code Snippet to focus more closely on the problem.

Upvotes: 2

Views: 135

Answers (0)

Related Questions