Alex
Alex

Reputation: 351

cassandra-spark-connector error with repartitionByCassandraReplica function

I'm trying to use the new join functionality from the 1.2 version but I get an error with the repartitionByCassandraReplica function in the repl.

I've tried to duplicate the example of the website and created a cassandra table (shopping_history) with a couple of elements : https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.mde

import com.datastax.spark.connector.rdd._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._
import com.datastax.driver.core._

case class CustomerID(cust_id: Int)
val idsOfInterest = sc.parallelize(1 to 1000).map(CustomerID(_))
val repartitioned =  idsOfInterest.repartitionByCassandraReplica("cim_dev", "shopping_history", 10)
repartitioned.first()

I get this error :

15/04/13 18:35:43 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, dev2-cim.aid.fr): java.lang.ClassNotFoundException: $line31.$read$$iwC$$iwC$CustomerID
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:344)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I use spark 1.2.0 with connector 1.2.0 RC 3. The joinWithCassandraTable function used on idsOfInterest works.

I'm also curious about the differences betwween : joinWithCassandraTable / cassandraTable with a In clause / foreachPartition(withSessionDo) syntax.

Do they all request the data to the local node which acts as a coordinator ? Is joinWithCassandraTable combine with repartitionByCassandraReplica as efficient as an async query, requesting data only to the local node ? What happen if repartitionByCassandraReplica is not applied ?

I've already asked this question on the google group forum of the cassandra connector : https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/b615ANGSySc

Thanks

Upvotes: 0

Views: 1233

Answers (1)

RussS
RussS

Reputation: 16576

I'll answer your second question here, and followup with the first portion if I can figure something out based on more information.

I'm also curious about the differences betwween : joinWithCassandraTable / cassandraTable with a In clause / foreachPartition(withSessionDo) syntax.

The cassandraTable with an in Clause will create a single spark partition. So it may be allright for very small in clauses, but the clause must be serialized from the driver to the spark application. This could be really bad for large in clauses and in general we don't want to send data back and forth from the spark driver to the executors if we don't have to.

joinWithCassandraTable and foreachPartition(withSessionDo) are very similar. The main difference is that the joinWithCassandraTable call is using the Connector transformation and reading code which should make it much easier to get Scala objects out of your Cassandra Rows. In both of these cases your data stays in RDD form and won't get serialized back to the driver. They will also both use the partitioner from the previous RDD (or last RDD which exposes a preferredLocation method) so they will be able to do work with repartitionByCassandraTable.

If repartitionByCassandraTable is not applied the data will be requested on a node that may or may not be a coordinator for the information you are requesting. This will add an additional network hop in your query but this may not be a very large performance penalty. The times at which you want to repartition before joining really depend on the total volume of data and the cost of the spark shuffle in the repartition op.

Upvotes: 2

Related Questions