Reputation: 117
I am trying to query data from parquet files in Scala Spark (1.5), including a query of 2 million rows ("variants" in the following code).
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")
val parquetFile = sqlContext.read.parquet(<path>)
parquetFile.registerTempTable("tmpTable")
sqlContext.cacheTable("tmpTable")
val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...)
val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... )
This runs fine when the number of rows fetched is low, but fails with a "Size exceeds Integer.MAX_VALUE" error when lots of data is requested. The error looks as follows:
User class threw exception: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times,
most recent failure: Lost task 43.3 in stage 1.0 (TID 123, node009):
java.lang.RuntimeException: java.lang.IllegalArgumentException:
Size exceeds Integer.MAX_VALUE at
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at ...
What can I do to make this work?
This looks like a memory issue, but I have tried using up to 100 executors with no difference (the time it takes to fail stays the same no matter the number of executors involved, too). It feels like the data isn't getting partitioned across the nodes?
I have attempted to force higher parallelization by naively replacing this line, to no avail:
val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10)
Upvotes: 6
Views: 2706
Reputation: 160
You can also try and limit the partition size by using the below spark config:
spark.sql.files.maxPartitionBytes = <Size in bytes> (Default 128 MB)
According to https://spark.apache.org/docs/latest/sql-performance-tuning.html :
The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
This ensures that at any given time each core of your node only works upon the volume specified.
Spark will create an appropriate number of partitions depending on this value increasing the parallelism of your program.
It's a good rule of thumb to set this value at 0.25 * memory per core
For example:
If your node has 16 cores, 32 GB mem.
mem_per_core = 32/16 = ~2 GB (ignoring the overhead)
max_partition_mb = 0.25 * (2 * 1024) = 512 MBs
Upvotes: 0
Reputation: 8996
I don't believe the issue is parquet specific. You are "hitting" a limitation on the maximum size of a partition in Spark.
Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at ...
The Integer.MAX_VALUE detected that you have a size of (I believe) a partition that is more than 2GB (requires more than an int32 to index it).
The comment from Joe Widen is spot on. You need to repartition your data even more. Try 1000 or more.
E.g.,
val data = sqlContext.read.parquet("data.parquet").rdd.repartition(1000).toDF
Upvotes: 7