Paul Trehiou
Paul Trehiou

Reputation: 537

Spark is crashing when computing big files

I have a program in Scala that read a CSV file, add a new column to the Dataframe and save the result as a parquet file. It works perfectly on small files (<5 Go) but when I try to use bigger files (~80 Go) it always fail when it should write the parquet file with this stacktrace :

16/10/20 10:03:37 WARN scheduler.TaskSetManager: Lost task 14.0 in stage 4.0 (TID 886, 10.0.0.10): java.io.EOFException: reached end of stream after reading 136445 bytes; 1245184 bytes expected
  at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
  at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
  at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
  at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
  at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

If anyone know what could cause this, that would help me a lot !

System used

All running in Docker in a 6 machine cluster (each 4 cores and 16 Go of RAM)

Example code

var df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "NULL").csv(hdfsFileURLIn)
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName)))
df.repartition(nPartitions, $"ipix").write.mode("overwrite").parquet(hdfsFileURLOut)

Upvotes: 1

Views: 1811

Answers (3)

Igor Berman
Igor Berman

Reputation: 1532

Here are few points that might help you:

  1. I think you should check distribution of your ipix column data, it might happen that you have data skew, so 1 or few partitions might be much bigger than other. Those fat partitions might be such that 1 task that is working on the fat partition might fail. It probably has something to do with output of your function a2p. I'd test first to run this job even without repartitioning(just remove this call and try to see if it succeeds - without repartition call it will use default partitions split probably by size of input csv file)

  2. I also hope that your input csv is not gzip-ed(since gzip-ed data it's not splittable, so all data will be in 1 partition)

Upvotes: 2

Kris
Kris

Reputation: 1724

The problem looks like the read failed when decompress a stream of shuffled data in YARN mode.

Try the following code and see how it goes.

var df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "NULL").csv(hdfsFileURLIn)
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName))).persist(StorageLevel.MEMORY_AND_DISK)
df.repartition(nPartitions, $"ipix").write.mode("overwrite").parquet(hdfsFileURLOut)

There is also a similar issue Spark job failing in YARN mode

Upvotes: 0

apolak
apolak

Reputation: 141

Can you provide code? perhaps the code you wrote are running on driver? how do you process the file?

there is a special Spark functionality of handling big data, for example RDD. once you do:

someRdd.collect()

You bring the rdd to the driver memory, hence not using the abilities of spark. Code that handles big data should run on slaves.

please check this : differentiate driver code and work code in Apache Spark

Upvotes: 0

Related Questions