Rodrigo_V
Rodrigo_V

Reputation: 180

Scala Spark Job in Dataproc cluster returns java.util.NoSuchElementException: None.get

I get the error

ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get

when I run my Job using a Dataproc cluster, when I run it locally it runs perfectly. I have recreated the issue using the following toy example.

package com.deequ_unit_tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object reduce_by_key_example {def main(args: Array[String]): Unit = {

  // Set the log level to only print errors
  Logger.getLogger("org").setLevel(Level.ERROR)

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  println("Step 1")
  val data = Seq(("Project", 1),
    ("Gutenberg’s", 1),
    ("Alice’s", 1),
    ("Adventures", 1),
    ("in", 1),
    ("Wonderland", 1),
    ("Project", 1),
    ("Gutenberg’s", 1),
    ("Adventures", 1),
    ("in", 1),
    ("Wonderland", 1),
    ("Project", 1),
    ("Gutenberg’s", 1))

  println("Step 2")
  val rdd = spark.sparkContext.parallelize(data)

  println("Step 3")
  val rdd2 = rdd.reduceByKey(_ + _)

  println("Step 4")
  rdd2.foreach(println)
  }
}

When I run this job in Dataproc, I get this error when executing the line

rdd2.foreach(println)

As additional information, I have to say that I wasn't receiving this error until some changes where applied in my company's Dataproc cluster. For colleagues using PySpark, with an equivalent version in Pyspark of the example above, changing

  sc = SparkContext('local')

to

  sc = SparkContext()

did the trick, but I couldn't find an equivalent solution in Spark Scala. Do you have any idea about what could be causing this issue? Any help is welcomed.

Upvotes: 2

Views: 1020

Answers (2)

Gara Walid
Gara Walid

Reputation: 455

I had a similar issue with the following log:

ERROR Executor:94 - Exception in task 0.0 in stage 1.0 (TID 1)
java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.SparkEnv.getShuffleEndpointClient(SparkEnv.scala:90)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommitRemote(IndexShuffleBlockResolver.scala:294)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:282)
    at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:120)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedData(BypassMergeSortShuffleWriter.java:224)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

I solved this by setting the master to yarn. In fact, I was deploying the spark in client mode to a Dataproc cluster (gcloud dataproc jobs submit spark)

  val spark: SparkSession = SparkSession.builder()
    .master("yarn")
    .appName("SparkByExamples.com")
    .getOrCreate()

Upvotes: 0

Nassereddine BELGHITH
Nassereddine BELGHITH

Reputation: 640

  1. Configure your pom.xml or your build.sbt as follow:

Add the provided scope in the script :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>stackOverFlowGcp</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.3</version>
            <scope>provided</scope>


        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.3</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.4.0</version>
            <scope>provided</scope>

        </dependency>


    </dependencies>


    <build>
        <plugins>
            <!-- Maven Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <!-- assembly Maven Plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>mainPackage.mainObject</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>

    </build>


</project>

  1. Create your package : Clean => rebuild => compile => package
package mainPackage
import org.apache.spark.sql.SparkSession

object mainObject {


  def main(args: Array[String]): Unit = {


    val spark: SparkSession = SparkSession.builder()
      //.master("local[*]")
      .appName("SparkByExamples")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    println("Step 1")
    val data = Seq(("Project", 1),
      ("Gutenberg’s", 1),
      ("Alice’s", 1),
      ("Adventures", 1),
      ("in", 1),
      ("Wonderland", 1),
      ("Project", 1),
      ("Gutenberg’s", 1),
      ("Adventures", 1),
      ("in", 1),
      ("Wonderland", 1),
      ("Project", 1),
      ("Gutenberg’s", 1))

    println("Step 2")
    val rdd = spark.sparkContext.parallelize(data)
    println("Step 3")
    val rdd2 = rdd.reduceByKey(_ + _)

    println("Step 4")
    rdd2.foreach(println)


  }
}
  1. Create your dataproc Cluster
  2. Run the spark job in dataproc

In dataproc you will not see the result as a mentionned before, if you want to know just read more about Dataproc approch on that. However you can show a dataframe in dataproc if you like.

enter image description here enter image description here enter image description here enter image description here

As you can see in dataproc every thing is working fine. Don't forget to shut down the cluster or delete it after finishing ;)

Upvotes: 2

Related Questions