M.R. Murazza
M.R. Murazza

Reputation: 346

Error Running Cassandra from Spark in Java - NoClassDefFoundError at org.apache.spark.sql.catalyst

I am using Cassandra 3.0.3, Spark 1.6.0 and trying to run by combining code from the old documentation in http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java and the new one in https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md.

Here is my pom.xml file

<?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>muhrafifm</groupId>
 <artifactId>spark-cass-twitterdw</artifactId>
 <version>1.0</version>
 <packaging>jar</packaging>
 <build>
    <plugins>
      <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.0</version>
          <configuration>
              <source>1.7</source>
              <target>1.7</target>
          </configuration>
      </plugin>
    </plugins>
</build>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>        
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
        <type>jar</type>    
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.9.1</version>
     </dependency>
</dependencies>

The changes I made are basically in method javaFunction, and here is one of the method after I change the javaFunction according to the new documentation. I've also included import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

private void generateData(JavaSparkContext sc) {
    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

    // Prepare the schema
    try (Session session = connector.openSession()) {
        session.execute("DROP KEYSPACE IF EXISTS java_api");
        session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
        session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
        session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
    }

    // Prepare the products hierarchy
    List<Product> products = Arrays.asList(
            new Product(0, "All products", Collections.<Integer>emptyList()),
            new Product(1, "Product A", Arrays.asList(0)),
            new Product(4, "Product A1", Arrays.asList(0, 1)),
            new Product(5, "Product A2", Arrays.asList(0, 1)),
            new Product(2, "Product B", Arrays.asList(0)),
            new Product(6, "Product B1", Arrays.asList(0, 2)),
            new Product(7, "Product B2", Arrays.asList(0, 2)),
            new Product(3, "Product C", Arrays.asList(0)),
            new Product(8, "Product C1", Arrays.asList(0, 3)),
            new Product(9, "Product C2", Arrays.asList(0, 3))
    );

    JavaRDD<Product> productsRDD = sc.parallelize(products);       
    javaFunctions(productsRDD).writerBuilder("java_api", "products", mapToRow(Product.class)).saveToCassandra();

    JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
        @Override
        public Boolean call(Product product) throws Exception {
            return product.getParents().size() == 2;
        }
    }).flatMap(new FlatMapFunction<Product, Sale>() {
        @Override
        public Iterable<Sale> call(Product product) throws Exception {
            Random random = new Random();
            List<Sale> sales = new ArrayList<>(1000);
            for (int i = 0; i < 1000; i++) {
                sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
            }
            return sales;
        }
    });
    javaFunctions(salesRDD).writerBuilder("java_api", "sales", mapToRow(Sale.class)).saveToCassandra();
}

And here is the error that I got.

16/03/04 13:29:06 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/03/04 13:29:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
    at muhrafifm.spark.cass.twitterdw.Demo.generateData(Demo.java:69)
    at muhrafifm.spark.cass.twitterdw.Demo.run(Demo.java:35)
    at muhrafifm.spark.cass.twitterdw.Demo.main(Demo.java:181)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 11 more
16/03/04 13:29:40 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/03/04 13:29:41 INFO SparkContext: Invoking stop() from shutdown hook
16/03/04 13:29:41 INFO SparkUI: Stopped Spark web UI at http://10.144.233.28:4040
16/03/04 13:29:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/04 13:29:42 INFO MemoryStore: MemoryStore cleared
16/03/04 13:29:42 INFO BlockManager: BlockManager stopped
16/03/04 13:29:42 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/04 13:29:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/04 13:29:42 INFO SparkContext: Successfully stopped SparkContext
16/03/04 13:29:42 INFO ShutdownHookManager: Shutdown hook called
16/03/04 13:29:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-    16fd2ae2-b61b-4411-a776-1e578caabba6
------------------------------------------------------------------------
BUILD FAILURE

Is there anything I did wrong? it seems like needing the package that I don't even use, is there anything to fix that? or should I use the previous version of the cassandra-spark-connector?

Any response is appreciated, Thank you.

Upvotes: 4

Views: 5997

Answers (4)

Shashi Sharma
Shashi Sharma

Reputation: 41

I am able to do it successfully.

My scala version is 2.11.12

Below is my pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sashi</groupId>
    <artifactId>SalesAnalysis</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>SalesAnalysis</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency> 

        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11 -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.0.5</version>
        </dependency>               


        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>3.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.17.Final</version>
        </dependency>

    </dependencies>
</project>

This is my spark-submit script:

spark-submit --class com.sashi.SalesAnalysis.CassandraSparkSalesAnalysis --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 /home/cloudera/Desktop/spark_ex/Cassandra/sales-analysis.jar

Upvotes: 0

Praveen
Praveen

Reputation: 2177

I had the same problem and the issue was the compatibility between the Spark version and the Spark Cassandra connector. I was using spark 2.3 and the Cassandra connector was an older version.

The version compatibility matrix is available here:

https://github.com/datastax/spark-cassandra-connector

Upvotes: 2

sunone5
sunone5

Reputation: 375

This is the POM I used for this application and it was completely ran without any issues with ( java version "1.8.0_131" & javac 1.8.0_131 ). Complete application can be found here. https://github.com/sunone5/BigData/tree/master/spark-cassandra-streaming


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>spark-cassandra-streaming</groupId>
    <artifactId>spark-cassandra-streaming</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11 -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.0.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector-java_2.10 -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector-java_2.10</artifactId>
            <version>1.6.0-M1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core -->
        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>3.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>    

    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

Upvotes: 0

rhernando
rhernando

Reputation: 1071

Code is looking for

org/apache/spark/sql/catalyst/package$ScalaReflectionLock$

So you should include spark-sql library, which has the right dependency.

Upvotes: 5

Related Questions