Vishaal
Vishaal

Reputation: 825

Parquet file too wide to work with in PySpark

I have a large Parquet file with 25k columns that is about 10GB. I'm trying to view it, and convert some rows to CSV.

All the tools I've tried have blown up (parquet-tools, fastparquet, pandas) so I'm using PySpark now but am running into Java out of memory errors (java.lang.OutOfMemoryError: Java heap space).

My machine has 96GB of ram. Prior to running Python, I use

export JAVA_OPTS="-Xms36g -Xmx90g"

I've also experimented by setting the driver memory to 80GB.

Here's the code I'm using. Unfortunately I can't share the data set.

from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *

sc = SparkContext(appName="foo")
sqlContext = SQLContext(sc)
sc._conf.set('spark.driver.memory', '80g')
readdf = sqlContext.read.parquet('dataset.parquet')
readdf.head(2)

Here's the error:

In [5]: df.head(2)                                                                                                                                                                           
23/02/01 20:48:43 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/02/01 20:48:48 ERROR Executor: Exception in task 7.0 in stage 3.0 (TID 13)20]
java.lang.OutOfMemoryError: Java heap space
    at java.base/java.util.Arrays.copyOfRange(Arrays.java:4030)
    at java.base/java.lang.StringCoding.decodeUTF8(StringCoding.java:732)
    at java.base/java.lang.StringCoding.decode(StringCoding.java:257)
    at java.base/java.lang.String.<init>(String.java:507)
    at java.base/java.lang.String.<init>(String.java:561)
    at shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readString(TCompactProtocol.java:687)
    at org.apache.parquet.format.InterningProtocol.readString(InterningProtocol.java:216)
    at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:406)
    at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:384)
    at org.apache.parquet.format.KeyValue.read(KeyValue.java:321)
    at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1317)
    at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1242)
    at org.apache.parquet.format.FileMetaData.read(FileMetaData.java:1116)
    at org.apache.parquet.format.Util.read(Util.java:362)
    at org.apache.parquet.format.Util.readFileMetaData(Util.java:151)
    at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1428)
    at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1410)
    at org.apache.parquet.format.converter.ParquetMetadataConverter$RangeMetadataFilter.accept(ParquetMetadataConverter.java:1205)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1410)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:582)
    at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:99)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:342)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$Lambda$2673/0x0000000101256040.apply(Unknown Source)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$2676/0x0000000101281040.apply(Unknown Source)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
23/02/01 20:48:49 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 8)
java.lang.OutOfMemoryError: Java heap space
    at java.base/java.util.Arrays.copyOfRange(Arrays.java:4030)
    at java.base/java.lang.StringCoding.decodeUTF8(StringCoding.java:732)
    at java.base/java.lang.StringCoding.decode(StringCoding.java:257)
    at java.base/java.lang.String.<init>(String.java:507)
    at java.base/java.lang.String.<init>(String.java:561)
    at shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readString(TCompactProtocol.java:687)
    at org.apache.parquet.format.InterningProtocol.readString(InterningProtocol.java:216)
    at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:406)
    at org.apache.parquet.format.KeyValue$KeyValueStandardScheme.read(KeyValue.java:384)
    at org.apache.parquet.format.KeyValue.read(KeyValue.java:321)
    at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1317)
    at org.apache.parquet.format.FileMetaData$FileMetaDataStandardScheme.read(FileMetaData.java:1242)
    at org.apache.parquet.format.FileMetaData.read(FileMetaData.java:1116)
    at org.apache.parquet.format.Util.read(Util.java:362)
    at org.apache.parquet.format.Util.readFileMetaData(Util.java:151)
    at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1428)
    at org.apache.parquet.format.converter.ParquetMetadataConverter$3.visit(ParquetMetadataConverter.java:1410)
    at org.apache.parquet.format.converter.ParquetMetadataConverter$RangeMetadataFilter.accept(ParquetMetadataConverter.java:1205)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1410)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:582)
    at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:99)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:342)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$Lambda$2673/0x0000000101256040.apply(Unknown Source)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
    at org.apache.spark.sql.execution.SparkPlan$$Lambda$2676/0x0000000101281040.apply(Unknown Source)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)

Any suggestions for dealing with this file? Thanks

Upvotes: 0

Views: 288

Answers (0)

Related Questions