Nithyananda Shetty
Nithyananda Shetty

Reputation: 25

scala joinWithCassandraTable result to dataframe

I'm using Datastax spark-Cassandra-connector to access some data in Cassandra. My requirement is to Join an RDD with a Cassandra table, fetch the result and store it in the hive table.

Im using joinWithCassandraTable to join the cassadra table. After the join the resuting RDD looks like below

com.datastax.spark.connector.rdd.CassandraJoinRDD[org.apache.spark.sql.Row, 
com.datastax.spark.connector.CassandraRow] = 
CassandraJoinRDD[17] at RDD at CassandraRDD.scala:19

I tried below steps to convert to the data frame but none of the approaches is working.

val data=joinWithRDD.map{
   case(_, cassandraRow) =>    Row(cassandraRow.columnValues:_*)
}

sqlContext.createDataFrame(data,schema)

I'm getting below error

java.lang.ClassCastException: cannot assign instance of
   scala.collection.immutable.List$SerializationProxy to field 
   org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of 
   type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

Can you please help me in converting joinWithCassandraTable to a dataframe?

Upvotes: 0

Views: 684

Answers (1)

Alex Ott
Alex Ott

Reputation: 87164

As I see, you're using dataframe on the left side of the join. Instead of using joinWithCassandraTable that uses RDD API, I recommend to take the Spark Cassandra Connector 2.5.x (2.5.1 is the latest) that has support for join in the Dataframe API, and use it directly. It's really easy, you just need to start your job with --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions to activate this functionality, after that, code is just using normal joins on dataframes:

val parsed = ...some dataframe...
val cassandra = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "stock_info", "keyspace" -> "test"))
  .load

// we can use left join to detect what data is incorrect - if we don't have some data in the
// Cassandra, then symbol field will be null, so we can detect such entries, and do something with that
// we can omit the joinType parameter, in that case, we'll process only data that are in the Cassandra
val joined = parsed.join(cassandra, cassandra("symbol") === parsed("ticker"), "left")
   .drop("ticker")

Full source code with README is here.

Upvotes: 0

Related Questions