Eternal_Explorer
Eternal_Explorer

Reputation: 1155

Spark not saving the dataframe as a paraquet file

Trying to save the spark dataframe as a paraquet file.But unable to achieve ,due to the Exception below.Kindly guide me,if I am missing something.The dataframe has been constructed from the kafka stream rdds.

dataframe.write.paraquet("/user/space")

Exception Stack:

Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:59)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:309)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:280)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 50 more

The snaphot of the Pom.xml used:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>Paymentprocessor</groupId>
    <artifactId>research</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>


    <name>research</name>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.tools.version>2.10</scala.tools.version>
        <scala.version>2.10.6</scala.version>
        <spark.version>1.6.1</spark.version>
        <scalaCompatVersion>2.10</scalaCompatVersion>
        <maven-scala-plugin.version>2.15.2</maven-scala-plugin.version>

    </properties>

    <repositories>
        <repository>
            <id>central</id>

            <name>Maven Repository</name>
            <url>https://repo1.maven.org/maven2</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
        <repository>
            <id>mapr-releases</id>
            <url>http://repository.mapr.com/maven/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>

    </repositories>
    <dependencies>
    <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> 
            <version>2.8.0</version> </dependency> -->

        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest</artifactId>
            <version>1.2</version>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>net.sf.jopt-simple</groupId>
            <artifactId>jopt-simple</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.4</version>
        </dependency>


        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-annotation</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.easymock</groupId>
            <artifactId>easymock</artifactId>
            <version>3.0</version>
            <scope>test</scope>
        </dependency>




        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.tools.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-csv</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>com.jsuereth</groupId>
            <artifactId>scala-arm_2.10</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-producer_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
        <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-v09_2.10</artifactId> 
            <version>1.6.1-mapr-1607</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> 
            <artifactId>spark-streaming-kafka-producer_2.10</artifactId> <version>1.6.1-mapr-1607</version> 
            </dependency> -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hivecontext-compatibility_2.10</artifactId>
            <version>2.0.0-preview</version>
        </dependency>

    </dependencies>


    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.8</version>
            </plugin>


        </plugins>
    </build>
</project>

Code snippet:

 val messagesDStream: InputDStream[(String, String)] = { 

          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)     
        }



        val valuesDStream: DStream[String] = messagesDStream.map(_._2)

         /*Construct RDD from Kafka*/  

        println("Count value"+valuesDStream.count())

        /*Construct RDD from Kafka*/       
          valuesDStream.foreachRDD { rdd =>
          // There exists at least one element in RDD
          if (!rdd.isEmpty) {
            val count = rdd.count
            println("count received " + count)
            val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)


            import sqlContext.implicits._
            import org.apache.spark.sql.functions._  


            val cdrDF = rdd.map(CallCreditCardRecord.parseCallCreditCardRecord).toDF()
           val cardRDD = cdrDF.cache()
            println("PRinting")

           cdrDF.registerTempTable("Card")
           cdrDF.printSchema()
           cdrDF.show()

            cdrDF.write.format("parquet").save("/usr/local/Cellar/hadoop/hdfs/tmp/nm-local-dir/CreditCardRecord.parquet")

          }
        }

    ssc.start()
    //ssc.awaitTermination()

    ssc.stop(stopSparkContext = true, stopGracefully = true)

Upvotes: 2

Views: 2046

Answers (2)

Paulo Moreira
Paulo Moreira

Reputation: 631

I had the same issue trying to read avro and just changed my gradle dependence to meet the spark version:

dependencies {
// https://mvnrepository.com/artifact/org.apache.spark/spark-core
implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.5'

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.5'

implementation group: 'org.apache.spark', name: 'spark-avro_2.12', version: '2.4.5'}

Upvotes: 1

Tzach Zohar
Tzach Zohar

Reputation: 37832

You seem to be mixing different Spark versions - most likely, your cluster (master / workers) runs one Spark version while your driver application another, therefore you're getting a ClassNotFoundException for a class that only exists in one of these versions.

Specifically, the class org.apache.spark.sql.execution.datasources.FileFormat was only created ~2 weeks ago (by this commit) and isn't part of any official Spark release yet: are you using Spark's "latest master" version in one of your components? If so - either use it in all components (but be prepared to see some bugs and rough edges), or make sure all of your code is compiled and executed with one official version.

EDIT (after pom file posted): your pom file contains two different Spark versions - 1.6.1 for most dependencies, and 2.0.0-preview for the last one:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hivecontext-compatibility_2.10</artifactId>
  <version>2.0.0-preview</version>
</dependency> 

You should remove this dependency (it's not needed in 1.6.1).

Upvotes: 3

Related Questions