Reputation: 825
I have a large Parquet file with 25k columns that is about 10GB. I'm trying to view it, and convert some rows to CSV.
All the tools I've tried have blown up (parquet-tools, fastparquet, pandas) so I'm using PySpark now but am running into Java out of memory errors (java.lang.OutOfMemoryError: Java heap space).
My machine has 96GB of ram. Prior to running Python, I use
export JAVA_OPTS="-Xms36g -Xmx90g"
I've also experimented by setting the driver memory to 80GB.
Here's the code I'm using. Unfortunately I can't share the data set.
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext(appName="foo")
sqlContext = SQLContext(sc)
sc._conf.set('spark.driver.memory', '80g')
readdf = sqlContext.read.parquet('dataset.parquet')
readdf.head(2)
Here's the error:
In [5]: df.head(2)
23/02/01 20:48:43 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/02/01 20:48:48 ERROR Executor: Exception in task 7.0 in stage 3.0 (TID 13)20]
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOfRange(Arrays.java:4030)
at java.base/java.lang.StringCoding.decodeUTF8(StringCoding.java:732)
at java.base/java.lang.StringCoding.decode(StringCoding.java:257)
at java.base/java.lang.String.<init>(String.java:507)
at java.base/java.lang.String.<init>(String.java:561)
at shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readString(TCompactProtocol.java:687)
at org.apache.parquet.format.InterningProtocol.readString(InterningProtocol.java:216)
at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:406)
at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:384)
at org.apache.parquet.format.KeyValue.read(KeyValue.java:321)
at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1317)
at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1242)
at org.apache.parquet.format.FileMetaData.read(FileMetaData.java:1116)
at org.apache.parquet.format.Util.read(Util.java:362)
at org.apache.parquet.format.Util.readFileMetaData(Util.java:151)
at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1428)
at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1410)
at org.apache.parquet.format.converter.ParquetMetadataConverter$RangeMetadataFilter.accept(ParquetMetadataConverter.java:1205)
at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1410)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:582)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:99)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:342)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$Lambda$2673/0x0000000101256040.apply(Unknown Source)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$2676/0x0000000101281040.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
23/02/01 20:48:49 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 8)
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOfRange(Arrays.java:4030)
at java.base/java.lang.StringCoding.decodeUTF8(StringCoding.java:732)
at java.base/java.lang.StringCoding.decode(StringCoding.java:257)
at java.base/java.lang.String.<init>(String.java:507)
at java.base/java.lang.String.<init>(String.java:561)
at shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readString(TCompactProtocol.java:687)
at org.apache.parquet.format.InterningProtocol.readString(InterningProtocol.java:216)
at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:406)
at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:384)
at org.apache.parquet.format.KeyValue.read(KeyValue.java:321)
at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1317)
at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1242)
at org.apache.parquet.format.FileMetaData.read(FileMetaData.java:1116)
at org.apache.parquet.format.Util.read(Util.java:362)
at org.apache.parquet.format.Util.readFileMetaData(Util.java:151)
at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1428)
at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1410)
at org.apache.parquet.format.converter.ParquetMetadataConverter$RangeMetadataFilter.accept(ParquetMetadataConverter.java:1205)
at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1410)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:582)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:99)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:342)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$Lambda$2673/0x0000000101256040.apply(Unknown Source)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$2676/0x0000000101281040.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
Any suggestions for dealing with this file? Thanks
Upvotes: 0
Views: 288