Reputation: 64
I'm trying to insert data into Hive using below code, it is always getting failed with
java.lang.IllegalArgumentException: Too large frame:'.
I tried tweaking the memory but didn't help.
Here are the details.
Error stack trace:
[Stage 4:=====================================================>(999 + 1) / 1000]18/11/27 09:59:44 WARN TaskSetManager: Lost task 364.0 in stage 4.0 (TID 1367, spark-node, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.shuffle.FetchFailedException: Too large frame: 5587345928
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:286)
... 8 more
Caused by: java.lang.IllegalArgumentException: Too large frame: 5587345928
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
Below is my sample code:
//create spark session first
val spark = SparkSession.builder()
.appName("MSSQLIngestion")
.master("yarn")
.config("spark.sql.caseSensitive", "false")
.config("spark.sql.shuffle.partitions", "1000")
.config("spark.shuffle.spill", "true")
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate();
spark.sql("set hive.exec.parallel=true")
// Create a Properties() object to hold the parameters.
val connectionProperties = new Properties()
connectionProperties.setProperty("Driver", driverClass)
connectionProperties.setProperty("fetchSize", "100000")
// read data from JDBC server and construct a dataframe
val jdbcDF1 = spark.read.jdbc(url = jdbcUrl, table = (select * from jdbcTable) e, properties = connectionProperties)
val jdbcDF = jdbcDF1.repartition(1000)
val count = jdbcDF.count()
println("red "+count+" records from sql server and started loading into hive")
// if count > 0 then insert the records into Hive
if (count > 0) {
// create spark temporary table
jdbcDF.createOrReplaceTempView("sparkTempTable")
// insert into Hive external table
spark.sql("insert into externalTable partition (hivePartitionCol) select * from sparkTempTable distribute by hivePartitionCol ")
}
println("completed the job for loading the data into hive")
spark.stop()
Here is my spark-submit:
spark-submit --class com.generic.MSSQLHiveIngestion --master yarn --num-executors 8 --executor-cores 2 --executor-memory 16G --driver-memory 8G --driver-cores 4 --conf spark.yarn.executor.memoryOverhead=1G data-ingestion.jar
Upvotes: 3
Views: 8763
Reputation: 29155
When the size of the shuffle data blocks exceeds the limit of 2 GB, which spark can not handle, the following error occurs.
Caused by: java.lang.IllegalArgumentException: Too large frame: 5211883372140375593
at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
>= Spark2.2
handles this issue in a better way when compared to other lower versions of Spark. For information, see SPARK-19659.
Use the following Spark configuration:
Modify the value of spark.sql.shuffle.partitions
from the default 200
to a value greater than 2001
.
Set the value of spark.default.parallelism
to the same value as spark.sql.shuffle.partitions
.
Identify the DataFrame that is causing the issue.
a. Add a Spark action(for instance, df.count()
) after creating a new DataFrame.
b. Print anything to check the DataFrame.
c. If the print statement is not executed for a DataFrame, then the issue is with that DataFrame.
After the DataFrame is identified, repartition the DataFrame by using df.repartition()
and then cache it by using df.cache()
.
If there is skewness in the data and you are using Spark version earlier than 2.2, then modify the code.
Upvotes: 3
Reputation: 978
This problem happens due to having too big objects to shuffle.
Can you try to increase suffle partitions ?
.config("spark.sql.shuffle.partitions", "1000")
Or Can you try to add this config:
.config("spark.shuffle.spill.compress", true)
.config("spark.shuffle.compress", true)
Or can you lower the block size to lower shuffle memory usage
Upvotes: 3