Reputation: 7605
I have an RDD[HbaseRecord]
which contains a custom complex type Name
. Both classes are defined below:
class HbaseRecord(
val uuid: String,
val timestamp: String,
val name: Name
)
class Name(
val firstName: String,
val middleName: String,
val lastName: String
)
At some point in my code I want to generate a DataFrame from that RDD, so I can save it as an avro file. I tried the following:
//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ...
//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name : Name = hbaseRecord.name
Row(uuid, timestamp, name)
})
//Here I define the schema
val schema = new StructType()
.add("uuid",StringType)
.add("timestamp", StringType)
.add("name", new StructType()
.add("firstName",StringType)
.add("middleName",StringType)
.add("lastName",StringType)
//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)
But I am getting the following error:
scala.MatchError: (of class java.lang.String) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I tried removing the complex type from the Row, so it would be Row[String, String]
and there is no error then. So I assume the problem is with the complex type.
What am I doing wrong? or what other approach could I follow to generate that DataFrame with the complex type?
Upvotes: 0
Views: 700
Reputation: 1528
I just used simple case class
for this instead of class.
name
column didn't conform to the schema that was defined.
Convert the name
column to Row type and it should work.
val rowRDD : RDD[Row] = objectRDD .map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name = Row(hbaseRecord.name.firstName,
hbaseRecord.name.middleName,hbaseRecord.name.lastName)
Row(uuid, timestamp, name)
})
Upvotes: 1