Reputation: 3274
I'm using the Datastax spark-cassandra-connector
to access some data in Cassandra.
To be able to efficiently access all the data I need to my query, I have to use the joinWithCassandraTable
method to get data from a bunch of partitions back. This gives me an object of class com.datastax.spark.connector.rdd.CassandraTableScanRDD
(or similar, to test I'm actually just using the standard sc.cassandraTable(ks, tbl)
method to read data).
The problem is, all the methods I need to use on the resulting object need an object of class org.apache.spark.sql.Dataset
.
I have done a lot of searching around and haven't been able to find anything to help - the closest I've found is this similar question, which I don't think has been sufficiently answered, as it ignores the use case where the recommended method of accessing all the necessary data is to use joinWithCassandraTable
.
I'm also new to java and scala, so sorry if I'm a little slow. Any help would be massively appreciated as I'm pretty stuck at this point.
Thanks, Akhil
Upvotes: 1
Views: 1273
Reputation: 16576
What you can do is read your RDD into an RDD[Row] and then change that into a DataFrame. Our only issue is we also need the Schema. So lets do this in two steps.
First lets get the schema programmatically from our join target
val schema = spark.read.cassandraFormat("dogabase", "test").load.schema
/**
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(owner,StringType,true),
StructField(dog_id,IntegerType,true),
StructField(dog_age,IntegerType,true),
StructField(dog_name,StringType,true))
**/
Then we can make org.apache.spark.sql.Row
objects out of our Cassandra Driver
rows.
import org.apache.spark.sql.Row
val joinResult =
sc.parallelize(Seq(Tuple1("Russ")))
.joinWithCassandraTable("test", "dogabase")
.map{ case(_, cassandraRow) => Row(cassandraRow.columnValues:_*)} //Unpack our Cassandra row values into a spark.sql.Row
Now that we have a schema and an RDD[Row] we can use the createDataFrame method of the spark session
val dataset = spark.createDataFrame(joinResult, schema)
dataset.show
/**
+-----+------+-------+--------+
|owner|dog_id|dog_age|dog_name|
+-----+------+-------+--------+
| Russ| 1| 10| cara|
| Russ| 2| 11|sundance|
+-----+------+-------+--------+
**/
And just incase you don't believe me that a DataFrame is a Dataset
dataset.getClass
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.Dataset
Some Cassandra types are not valid basis for Spark Rows so you may need to convert them. This could be done by writing a quick conversion function. Unfortunately the built in conversion that the SCC uses makes an internal representation so we can't use those conversions.
def convertToSpark(element:Any): Any = {
case time: org.joda.time.LocalDate => time.toDateTimeAtStartOfDay().toDate //Convert to java.util.Date
case other => other
}
Then when making your rows
cassandraRow.columnValues.map(convertToSpark):_*
Upvotes: 3