Reputation: 2517
I am trying to store data from a Spark (v.1.6) DataFrame to Cassandra. I managed to get the examples on the Spark Cassandra Connector page running, however, I am stuck with my own code. Consider the following snippet:
case class MyCassandraRow(id : Long, sfl : Seq[Float])
df.map(r =>
MyCassandraRow(
r.getAsLong(0),
r.getAs[MySeqFloatWrapper]("sfl").getSeq())
).saveToCassandra("keyspace", "table")
MySeqFloatWrapper
is a UDT, I wrote, which has a method getSeq()
and returns a Seq[Float]
.
Unfortunately, if I run this code, I get a
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericMutableRow cannot be cast to java.lang.Byte
Actually, I get this exception even if I do (and also if I do Row(r.getLong(0))
), but not if I write:
df.map(r => r.getLong(0)).collect()
On the other hand, if I add a Row
around my case class and I write
df.map(r =>
Row(
MyCassandraRow(
r.getAsLong(0),
r.getAs[MySeqFloatWrapper]("sfl").getSeq())
)
).saveToCassandra("keyspace", "table")
I get the following exception:
scala.ScalaReflectionException: <none> is not a term
Upvotes: 2
Views: 697
Reputation: 2517
I just realized that the ClassCastException
was related to my UDT MySeqFloatWrapper
and the sqlType
I defined there, which apparently was not considered so far as with Spark 1.5 it worked fine, but with Spark 1.6 no longer (see also SPARK-12878).
If you need a template to see how to define UDTs properly, see also the VectorUDT
example on github.
Upvotes: 3