Akhil Nair
Akhil Nair

Reputation: 3274

Scala joinWithCassandraTable result (or CassandraTableScanRDD) to Dataset

I'm using the Datastax spark-cassandra-connector to access some data in Cassandra.

To be able to efficiently access all the data I need to my query, I have to use the joinWithCassandraTable method to get data from a bunch of partitions back. This gives me an object of class com.datastax.spark.connector.rdd.CassandraTableScanRDD (or similar, to test I'm actually just using the standard sc.cassandraTable(ks, tbl) method to read data).

The problem is, all the methods I need to use on the resulting object need an object of class org.apache.spark.sql.Dataset.

I have done a lot of searching around and haven't been able to find anything to help - the closest I've found is this similar question, which I don't think has been sufficiently answered, as it ignores the use case where the recommended method of accessing all the necessary data is to use joinWithCassandraTable.

I'm also new to java and scala, so sorry if I'm a little slow. Any help would be massively appreciated as I'm pretty stuck at this point.

Thanks, Akhil

Upvotes: 1

Views: 1273

Answers (1)

RussS
RussS

Reputation: 16576

What you can do is read your RDD into an RDD[Row] and then change that into a DataFrame. Our only issue is we also need the Schema. So lets do this in two steps.

First lets get the schema programmatically from our join target

val schema = spark.read.cassandraFormat("dogabase", "test").load.schema

/**
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(owner,StringType,true), 
StructField(dog_id,IntegerType,true), 
StructField(dog_age,IntegerType,true), 
StructField(dog_name,StringType,true))
**/

Then we can make org.apache.spark.sql.Row objects out of our Cassandra Driver rows.

import org.apache.spark.sql.Row
val joinResult = 
  sc.parallelize(Seq(Tuple1("Russ")))
    .joinWithCassandraTable("test", "dogabase")
    .map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row

Now that we have a schema and an RDD[Row] we can use the createDataFrame method of the spark session

val dataset = spark.createDataFrame(joinResult, schema)
dataset.show

/**
+-----+------+-------+--------+
|owner|dog_id|dog_age|dog_name|
+-----+------+-------+--------+
| Russ|     1|     10|    cara|
| Russ|     2|     11|sundance|
+-----+------+-------+--------+
**/

And just incase you don't believe me that a DataFrame is a Dataset

dataset.getClass
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset

EDIT: Possible needed converters

Some Cassandra types are not valid basis for Spark Rows so you may need to convert them. This could be done by writing a quick conversion function. Unfortunately the built in conversion that the SCC uses makes an internal representation so we can't use those conversions.

def convertToSpark(element:Any): Any = {
  case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date
  case other => other
}

Then when making your rows

cassandraRow.columnValues.map(convertToSpark):_*

Upvotes: 3

Related Questions