Vibhav Singh Rohilla
Vibhav Singh Rohilla

Reputation: 768

Spark Cassandra Java integration Issues

I am new to spark and Cassandra both.

I am trying to achieve aggregate functionality using spark+java on Cassandra Data.

I am not able to fetch the Cassandra data in my code. I read multiple discussions and found out that there are some compatibility issues with spark and spark-Cassandra connector. I tried a lot to fix my issue but was not able to fix it.
Find below pom.xml (kindly do not mind extra dependencies also. I need to make sure which library is causing the issue) -

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>IBeatCassPOC</groupId>
<artifactId>ibeatCassPOC</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>

    <!--CASSANDRA START-->
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-mapping</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-extras</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.sparkjava</groupId>
        <artifactId>spark-core</artifactId>
        <version>2.5.4</version>
    </dependency>

    <!--https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10-->
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>2.0.0-M3</version>
    </dependency>
    <!--CASSANDRA END-->
    <!-- Kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>

    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.2</version>
    </dependency>

    <!-- Spark -->

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>

    <!-- Logging -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.1.0</version>
</dependency>

    <!-- Spark-Kafka -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>


    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
    <!-- Jackson -->
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.9.13</version>
    </dependency>

    <!-- Google Collection Library -->
    <dependency>
        <groupId>com.google.collections</groupId>
        <artifactId>google-collections</artifactId>
        <version>1.0-rc2</version>
    </dependency>

    <!--UA Detector dependency for AgentType in PageTrendLog-->
    <dependency>
        <groupId>net.sf.uadetector</groupId>
        <artifactId>uadetector-core</artifactId>
        <version>0.9.12</version>
    </dependency>
    <dependency>
        <groupId>net.sf.uadetector</groupId>
        <artifactId>uadetector-resources</artifactId>
        <version>2013.12</version>
    </dependency>

    <dependency>
        <groupId>com.esotericsoftware</groupId>
        <artifactId>kryo</artifactId>
        <version>3.0.3</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-stream</artifactId>
        <version>4.0.4</version>
    </dependency>

    <!-- MongoDb Java Connector -->
    <!-- <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId>
        <version>2.13.0</version> </dependency> -->

</dependencies>

Java source code being used to fetch the data -

    import com.datastax.spark.connector.japi.CassandraJavaUtil;
    import com.datastax.spark.connector.japi.CassandraRow;
    import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;                
    import java.util.ArrayList;

    public class ReadCassData {
        public static void main(String[] args) {
            
            SparkConf sparkConf = new SparkConf();
            sparkConf.setAppName("Spark-Cassandra Integration");
            sparkConf.setMaster("local[4]");
            sparkConf.set("spark.cassandra.connection.host", "stagingServer22");
            sparkConf.set("spark.cassandra.connection.port", "9042");

            sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
            sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
    
    
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            String keySpaceName = "testKeyspace";
            String tableName = "testTable";
            
            CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);
            System.out.println("Cassandra Count" + cassandraRDD.cassandraCount());
            final ArrayList<CassandraRow> data = new ArrayList<CassandraRow>();
            
            cassandraRDD.reduce(new Function2<CassandraRow, CassandraRow, CassandraRow>() {
                public CassandraRow call(CassandraRow v1, CassandraRow v2) throws Exception {
                    System.out.println("hello");
                    System.out.println(v1 + " ____ " + v2);
                    data.add(v1);
                    data.add(v2);
                    return null;
                }
            });
            System.out.println( "data Size -" + data.size());
            
        }
    }

Exception being encountered is -

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost): java.lang.NoSuchMethodError: org.apache.spark.TaskContext.getMetricsSources(Ljava/lang/String;)Lscala/collection/Seq;
        at org.apache.spark.metrics.MetricsUpdater$.getSource(MetricsUpdater.scala:20)
        at org.apache.spark.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:56)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:329)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
            
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I have Cassandra cluster deployed on a remote location and Cassandra version being used is 3.9.

Please guide what are the compatible dependencies. I can not change my Cassandra version (currently 3.9). Please suggest what spark/spark-cassandra-connector version to use to successfully execute map-reduce jobs on DB.

Upvotes: 2

Views: 468

Answers (3)

satish silveri
satish silveri

Reputation: 358

Following POM worked for me:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>


    <dependency>
        <groupId>jdk.tools</groupId>
        <artifactId>jdk.tools</artifactId>
        <version>1.6</version>
        <scope>system</scope>
        <systemPath>D:\Jars\tools-1.6.0.jar</systemPath>
    </dependency>
</dependencies>

Check it.I successfully ingested streaming data from Kafka to Cassandra. Similarly u can pull data into javaRDD.

Upvotes: 0

Piyush_Rana
Piyush_Rana

Reputation: 305

I have tried with connecting with spark and have used spark cassandra connector in scala .

val spark = "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0"

val sparkCore = "org.apache.spark" %% "spark-sql" % "1.6.1"

And below is my working code -

import com.datastax.driver.dse.graph.GraphResultSet
import com.spok.util.LoggerUtil
import com.datastax.spark.connector._
import org.apache.spark._

object DseSparkGraphFactory extends App {

  val dseConn = {         

     LoggerUtil.info("Connecting with DSE Spark Cluster....")
        val conf = new SparkConf(true)
          .setMaster("local[*]")
          .setAppName("test")
          .set("spark.cassandra.connection.host", "Ip-Address")
        val sc = new SparkContext(conf)
        val rdd = sc.cassandraTable("spokg_test", "Url_p")
        rdd.collect().map(println)

  }

Upvotes: 2

FaigB
FaigB

Reputation: 2281

Please refer to Cassandra Spark Connector for relevant version of connector depending on your spark version in your environment. It should be 1.5, 1.6 or 2.0

Upvotes: 0

Related Questions