Srinivas
Srinivas

Reputation: 64

Apache Spark Scala - Hive insert into throwing a "too large frame error"

I'm trying to insert data into Hive using below code, it is always getting failed with

java.lang.IllegalArgumentException: Too large frame:'.

I tried tweaking the memory but didn't help.

Here are the details.

Error stack trace:

[Stage 4:=====================================================>(999 + 1) / 1000]18/11/27 09:59:44 WARN TaskSetManager: Lost task 364.0 in stage 4.0 (TID 1367, spark-node, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.shuffle.FetchFailedException: Too large frame: 5587345928
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:286)
... 8 more
Caused by: java.lang.IllegalArgumentException: Too large frame: 5587345928
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more

Below is my sample code:

 //create spark session first
    val spark = SparkSession.builder()
      .appName("MSSQLIngestion")
      .master("yarn")
      .config("spark.sql.caseSensitive", "false")
      .config("spark.sql.shuffle.partitions", "1000")
      .config("spark.shuffle.spill", "true")
      .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate();

    spark.sql("set hive.exec.parallel=true")

    // Create a Properties() object to hold the parameters.
    val connectionProperties = new Properties()
    connectionProperties.setProperty("Driver", driverClass)
    connectionProperties.setProperty("fetchSize", "100000")

    // read data from JDBC server and construct a dataframe
    val jdbcDF1 = spark.read.jdbc(url = jdbcUrl, table = (select * from jdbcTable) e, properties = connectionProperties)

    val jdbcDF = jdbcDF1.repartition(1000)

    val count = jdbcDF.count()

    println("red "+count+" records from sql server and started loading into hive")

    // if count > 0 then insert the records into Hive
    if (count > 0) {
      // create spark temporary table
      jdbcDF.createOrReplaceTempView("sparkTempTable")
      // insert into Hive external table
      spark.sql("insert into externalTable partition (hivePartitionCol) select * from sparkTempTable  distribute by  hivePartitionCol ")
    }
    println("completed the job for loading the data into hive")

    spark.stop()

Here is my spark-submit:

spark-submit --class com.generic.MSSQLHiveIngestion --master yarn --num-executors 8 --executor-cores 2 --executor-memory 16G --driver-memory 8G --driver-cores 4 --conf spark.yarn.executor.memoryOverhead=1G data-ingestion.jar

Upvotes: 3

Views: 8763

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29155

When the size of the shuffle data blocks exceeds the limit of 2 GB, which spark can not handle, the following error occurs.

    Caused by: java.lang.IllegalArgumentException: Too large frame: 5211883372140375593
            at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
            at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)

Option 1 (>= Spark2.2):

  1. >= Spark2.2 handles this issue in a better way when compared to other lower versions of Spark. For information, see SPARK-19659.

  2. Use the following Spark configuration: Modify the value of spark.sql.shuffle.partitions from the default 200 to a value greater than 2001. Set the value of spark.default.parallelism to the same value as spark.sql.shuffle.partitions.

Option 2:

  1. Identify the DataFrame that is causing the issue.
    a. Add a Spark action(for instance, df.count()) after creating a new DataFrame.
    b. Print anything to check the DataFrame.
    c. If the print statement is not executed for a DataFrame, then the issue is with that DataFrame.

  2. After the DataFrame is identified, repartition the DataFrame by using df.repartition() and then cache it by using df.cache().

  3. If there is skewness in the data and you are using Spark version earlier than 2.2, then modify the code.

Upvotes: 3

Driss NEJJAR
Driss NEJJAR

Reputation: 978

This problem happens due to having too big objects to shuffle.

Can you try to increase suffle partitions ?

.config("spark.sql.shuffle.partitions", "1000")

Or Can you try to add this config:

.config("spark.shuffle.spill.compress", true)

.config("spark.shuffle.compress", true)

Or can you lower the block size to lower shuffle memory usage

Upvotes: 3

Related Questions