Reputation: 2730
I'm running this code in spark-shell
, spark 2.3.0
:
val lineitem=spark.read.parquet("hdfs://namenode:8020/lineitem.parquet")
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val increase = udf { (x: Double, y: Double) => x * (1 + y) }
val q1=lineitem.filter($"l_shipdate" <= "1998-09-02")
.groupBy($"l_returnflag", $"l_linestatus")
.agg(sum($"l_quantity"), sum($"l_extendedprice"),
sum(decrease($"l_extendedprice", $"l_discount")),
sum(increase(decrease($"l_extendedprice", $"l_discount"), $"l_tax")),
avg($"l_quantity"), avg($"l_extendedprice"), avg($"l_discount"), count($"l_quantity"))
.sort($"l_returnflag", $"l_linestatus")
and everything works fine til now. but when I want to measure execution time of query using spark.time(q1.show())
I'm getting :
2018-12-22 17:49:56 ERROR Executor:91 - Exception in task 0.0 in stage 9.0 (TID 77)
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLengt h(Ljava/nio/ByteBuffer;II)I
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyD ecompressor.java:62)
at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(Non BlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(Byte sInput.java:205)
at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$Pla inDoubleDictionary.<init>(PlainValuesDictionary.java:194)
at org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:98)
at org.apache.parquet.column.Encoding$4.initDictionary(Encoding.java:149 )
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnRe ader.<init>(VectorizedColumnReader.java:114)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR ecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:312)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR ecordReader.nextBatch(VectorizedParquetRecordReader.java:258)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR ecordReader.nextKeyValue(VectorizedParquetRecordReader.java:161)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNe xt(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNex t(FileScanRDD.scala:106)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIt erator(FileScanRDD.scala:182)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNex t(FileScanRDD.scala:106)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte ratorForCodegenStage1.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte ratorForCodegenStage1.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte ratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRo wIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$ano n$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(Bypa ssMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scal a:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scal a:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:617)
at java.lang.Thread.run(Thread.java:748)
2018-12-22 17:49:56 ERROR Executor:91 - Exception in task 2.0 in stage 9.0 (TID 79)
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLengt h(Ljava/nio/ByteBuffer;II)I
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyD ecompressor.java:62)
at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(Non BlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(Byte sInput.java:205)
Any idea what's wrong and how to solve it?
Upvotes: 0
Views: 323
Reputation: 4089
I believe the problem is not related to spark.time
. The problem is caused by not being able to read the Snappy compressed files. The first block of code you posted is just a transformation, meaning Spark does not actually try to execute it. Remember, Spark uses lazy evaluation.
It is not until you call q1.show()
that it actually tries to execute query, which triggers the error in reading the Snappy files.
What you really need to troubleshoot is the lang.UnsatisfiedLinkError
. There was a recently fixed issue involving a conflict in the Snappy versions used by Spark 2.3.0 and Hadoop 2.8.3:
It seems updating to Spark 2.3.2 fixes the issue:
Upvotes: 5