Reputation: 7605
This question is the continuation of this other one, where the user who gave the valid answer requested a new question to explain my further doubts.
What I am trying is to generate a dataframe from a RDD[Objects]
where my objects has got primitive types, but also complex types. In the previous questions it was explained how to parse a complex type Map.
What I tried next is to extrapolate the given solution to parse a Map[Map]. So in the DataFrame it is converted into a Array(Map).
Below I give the code I have written so far:
//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ...
//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD.map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name = Row(hbaseRecord.nameMap.firstName.getOrElse(""),
hbaseRecord.nameMap.middleName.getOrElse(""),
hbaseRecord.nameMap.lastName.getOrElse(""))
val contactsMap = hbaseRecord.contactsMap
val homeContactMap = contactsMap.get("HOME")
val homeContact = Row(homeContactMap.contactType,
homeContactMap.areaCode,
homeContactMap.number)
val workContactMap = contactsMap.get("WORK")
val workContact = Row(workContactMap.contactType,
workContactMap.areaCode,
workContactMap.number)
val contacts = Row(homeContact,workContact)
Row(uuid, timestamp, name, contacts)
}
)
//Here I define the schema
val schema = new StructType()
.add("uuid",StringType)
.add("timestamp", StringType)
.add("name", new StructType()
.add("firstName",StringType)
.add("middleName",StringType)
.add("lastName",StringType)
.add("contacts", new StructType(
Array(
StructField("contactType", StringType),
StructField("areaCode", StringType),
StructField("number", StringType)
)))
//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)
But I am getting the following error:
19/03/18 12:09:53 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 8) scala.MatchError: [HOME,05,12345678] (of class org.apache.spark.sql.catalyst.expressions.GenericRow) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I tried as well to generate the contacts element as an array:
val contacts = Array(homeContact,workContact)
But then I get the following error instead:
scala.MatchError: [Lorg.apache.spark.sql.Row;@726c6aec (of class [Lorg.apache.spark.sql.Row;)
Can anyone spot the problem?
Upvotes: 1
Views: 719
Reputation: 10406
Let's simplify your situation to your array of contacts. That's where the problem is. You are trying to use this schema:
val schema = new StructType()
.add("contacts", new StructType(
Array(
StructField("contactType", StringType),
StructField("areaCode", StringType),
StructField("number", StringType)
)))
to store a list of contacts, which is a struct type. Yet, this schema cannot contain a list, just one contact. We can verify it with:
spark.createDataFrame(sc.parallelize(Seq[Row]()), schema).printSchema
root
|-- contacts: struct (nullable = true)
| |-- contactType: string (nullable = true)
| |-- areaCode: string (nullable = true)
| |-- number: string (nullable = true)
Indeed, the Array
you have in your code is just meant to contain the fields of your "contacts" struct type.
To achieve what you want, a type exists: ArrayType
. This yields a slightly different result:
val schema_ok = new StructType()
.add("contacts", ArrayType(new StructType(Array(
StructField("contactType", StringType),
StructField("areaCode", StringType),
StructField("number", StringType)))))
spark.createDataFrame(sc.parallelize(Seq[Row]()), schema_ok).printSchema
root
|-- contacts: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- contactType: string (nullable = true)
| | |-- areaCode: string (nullable = true)
| | |-- number: string (nullable = true)
and it works:
val row = Row(Array(
Row("type", "code", "number"),
Row("type2", "code2", "number2")))
spark.createDataFrame(sc.parallelize(Seq(row)), schema_ok).show(false)
+-------------------------------------------+
|contacts |
+-------------------------------------------+
|[[type,code,number], [type2,code2,number2]]|
+-------------------------------------------+
So if you update the schema with this version of "contacts", just replace val contacts = Row(homeContact,workContact)
by val contacts = Array(homeContact,workContact)
and it should work.
NB: if you want label your contacts (with HOME or WORK), there exists a MapType
type as well.
Upvotes: 2