Reputation: 22832
I have parquet data in S3 partitioned by nyc_date in the format s3://mybucket/mykey/nyc_date=Y-m-d/*.gz.parquet
.
I have a DateType column event_date
that for some reason throws this error when I try to read from S3 and write to hdfs using EMR.
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
df = spark.read.parquet('s3a://mybucket/mykey/')
df.limit(100).write.parquet('hdfs:///output/', compression='gzip')
Error:
java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Here's what I figured out:
event_date
doesn't cause any errors.'s3a://mybucket/mykey/*/*.gz.parquet'
still throws error.Really weird this causes an error only for a DateType column. I don't have any other DateType columns.
Using Spark 2.0.2 and EMR 5.2.0.
Upvotes: 14
Views: 31288
Reputation: 1358
I had this exception when Spark was reading the Parquet file generated from JSON file.
TLDR: If possible, re-write the input Parquet with the expected schema forcefully applied.
Scala code below. Python won't be too different.
This is pretty much how my Parquet generation looked like at first:
spark.read
.format("json")
.load("<path-to-json-file>.json")
.write
.parquet("<path-to-output-directory>")
But the Spark job which would read the above Parquet was enforcing the schema on the input. About like this:
val structType: StructType = StructType(fields = Seq(...))
spark.read.schema(structType)
And above is where the exception basically occurs.
FIX: In order to fix the exception I had to forcefully apply the schema to the data I generated:
spark.read
.schema(structType) // <===
.format("json")
.load("<path-to-json-file>.json")
.write
.parquet("<path-to-output-directory>")
To my understanding, the reason for the exception in my case was not (only) the String-Type
->DateType
conversion, like for @kamil-sindi.
But also the fact that when reading JSON, Spark assigns LongType
to all numeric values. Thus, my Parquet would be saved with LongType
fields.
And the Spark job reading that Parquet, presumably, struggled to convert LongType
to IntegerType
.
Upvotes: 1
Reputation: 1
I know I'm late to the party...
I had a similar issue. I read several parquet dirs, unioned them, and tried to write.
My fix was to add a .select(...) before the write.
Upvotes: -6
Reputation: 22832
I just used StringType instead of DateType when writing parquet. Don't have the issue anymore.
Upvotes: 5