Marzi Heidari
Marzi Heidari

Reputation: 2730

spark.time() not working for dataframe query

I'm running this code in spark-shell, spark 2.3.0:

val lineitem=spark.read.parquet("hdfs://namenode:8020/lineitem.parquet")
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val increase = udf { (x: Double, y: Double) => x * (1 + y) }
val q1=lineitem.filter($"l_shipdate" <= "1998-09-02")
  .groupBy($"l_returnflag", $"l_linestatus")
  .agg(sum($"l_quantity"), sum($"l_extendedprice"),
        sum(decrease($"l_extendedprice", $"l_discount")),
        sum(increase(decrease($"l_extendedprice", $"l_discount"), $"l_tax")),
        avg($"l_quantity"), avg($"l_extendedprice"), avg($"l_discount"), count($"l_quantity"))
.sort($"l_returnflag", $"l_linestatus")

and everything works fine til now. but when I want to measure execution time of query using spark.time(q1.show()) I'm getting :

    2018-12-22 17:49:56 ERROR Executor:91 - Exception in task 0.0 in stage 9.0 (TID                                                                                         77)
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLengt                                                                                        h(Ljava/nio/ByteBuffer;II)I
        at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
        at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
        at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyD                                                                                        ecompressor.java:62)
        at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(Non                                                                                        BlockedDecompressorStream.java:51)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(Byte                                                                                        sInput.java:205)
        at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$Pla                                                                                        inDoubleDictionary.<init>(PlainValuesDictionary.java:194)
        at org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:98)
        at org.apache.parquet.column.Encoding$4.initDictionary(Encoding.java:149                                                                                        )
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnRe                                                                                        ader.<init>(VectorizedColumnReader.java:114)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR                                                                                        ecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:312)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR                                                                                        ecordReader.nextBatch(VectorizedParquetRecordReader.java:258)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetR                                                                                        ecordReader.nextKeyValue(VectorizedParquetRecordReader.java:161)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNe                                                                                        xt(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNex                                                                                        t(FileScanRDD.scala:106)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIt                                                                                        erator(FileScanRDD.scala:182)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNex                                                                                        t(FileScanRDD.scala:106)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte                                                                                        ratorForCodegenStage1.scan_nextBatch$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte                                                                                        ratorForCodegenStage1.agg_doAggregateWithKeys$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIte                                                                                        ratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRo                                                                                        wIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$ano                                                                                        n$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(Bypa                                                                                        ssMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scal                                                                                        a:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scal                                                                                        a:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                                                        java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                                                        .java:617)
        at java.lang.Thread.run(Thread.java:748)
2018-12-22 17:49:56 ERROR Executor:91 - Exception in task 2.0 in stage 9.0 (TID                                                                                         79)
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLengt                                                                                        h(Ljava/nio/ByteBuffer;II)I
        at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
        at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
        at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyD                                                                                        ecompressor.java:62)
        at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(Non                                                                                        BlockedDecompressorStream.java:51)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(Byte                                                                                        sInput.java:205)

Any idea what's wrong and how to solve it?

Upvotes: 0

Views: 323

Answers (1)

andrew
andrew

Reputation: 4089

I believe the problem is not related to spark.time. The problem is caused by not being able to read the Snappy compressed files. The first block of code you posted is just a transformation, meaning Spark does not actually try to execute it. Remember, Spark uses lazy evaluation.

It is not until you call q1.show() that it actually tries to execute query, which triggers the error in reading the Snappy files.

What you really need to troubleshoot is the lang.UnsatisfiedLinkError. There was a recently fixed issue involving a conflict in the Snappy versions used by Spark 2.3.0 and Hadoop 2.8.3:

It seems updating to Spark 2.3.2 fixes the issue:

https://issues.apache.org/jira/browse/SPARK-24018?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel

Upvotes: 5

Related Questions