Reputation: 82
I wrote an app using the spark cassandra connector . Now , when spark-submit the job i get the error java.lang.IllegalArgumentException: requirement failed: No mappable properties found in class: MailBox , even though i defined a type converter as specified in https://github.com/datastax/spark-cassandra-connector/blob/master/doc/6_advanced_mapper.md , my thoughts are i need a companion object for MailBox where i define a mapper , but i can't find an example for it in the doc. Does anyone know how to solve this ? Thanks
The code:
object Test {
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = typeTag[Size]
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[Long] {
def targetTypeTag = typeTag[Long]
def convertPF = { case Size(long) => long.toLong }
}
case class MailBox(id: String,totalsize: Size)
case class Id(mailboxid:String)
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = typeTag[Id]
def convertPF = { case str: String => Id(str)
case str: UUID => Id(str.toString) }
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = typeTag[String]
def convertPF = { case Id(str) => str.toString }
}
def main(args: Array[String]) {
val sc = new SparkContext();
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
val test= sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveAsCassandraTable("test","Mailbox")
}
}
Upvotes: 1
Views: 1383
Reputation: 16576
First let me post a quick working example, then I'll walk through what is going wrong
package com.datastax.spark.example
import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import com.datastax.spark.connector.types._
import scala.reflect.runtime.universe._
import java.util.UUID
import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}
case class MailBox(id: Id,totalsize: Size)
case class Id(mailboxid:String)
object Test {
val LongTypeTag = SparkReflectionLock.synchronized {
implicitly[TypeTag[java.lang.Long]]
}
val SizeTypeTag = SparkReflectionLock.synchronized {
typeTag[Size]
}
val IdTypeTag = SparkReflectionLock.synchronized {
typeTag[Id]
}
val StringTypeTag = SparkReflectionLock.synchronized {
implicitly[TypeTag[String]]
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = SizeTypeTag
def convertPF = { case long: Long => Size(long) }
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = SizeTypeTag
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[java.lang.Long] {
def targetTypeTag = LongTypeTag
def convertPF = { case Size(long) => long.toLong }
}
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = IdTypeTag
def convertPF = {
case str: String => Id(str)
case str: UUID => Id(str.toString)
}
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = StringTypeTag
def convertPF = { case Id(str) => str.toString }
}
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
def main(args: Array[String]) {
val sc = new SparkContext();
val test = sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveToCassandra("ks","mailbox")
}
}
saveAsCassandraTable
uses the fromType method which requires known types (not Custom ones). This is because saveAsCassandraTable creates a Cassandra column based on a known field type. With the a custom type converter you don't explicitly state the (1 to 1) mapping between your type and a Cassandra Column so it can't be looked up. Since saveAsCassandraTable creates the Cassandra table before inserting to it, it gets stuck since it doesn't know how to make the table.
To fix this we change the line
test.saveAsCassandraTable("test","Mailbox")
to
test.saveToCassandraTable("test","Mailbox")
Where we have pre-made the the table in CQLSH but you could also do this using the Java Driver in your application.
TypeConverter chaining doesn't work with custom type converters. This means that we need to provide converters from Custom types to Java types. For this I changed the SizeToLong Converter
object SizeToLongConverter extends TypeConverter[java.lang.Long] {
I've added synchronized blocks (using the SparkReflectionLock) to make sure we don't end up with any issues there.
See
SparkReflectionLock.synchronized
To make sure our registrations happen on the executor JVMs I moved them out of the "main" scope. I'm not sure how important this is but it's best to reflect that this should be happening wherever the code is shipped to and not just during the main method.
Upvotes: 3