Reputation: 21
I am using MongoDB-Hadoop connector to read a collection having embedded documents.
JSON collection : PersonaMetaData
{
"user_id" : NumberLong(2),
"persona_created" : true,
"persona_createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona" : [{"persona_type" : 1,
"created_using_algo" : "Name of the algo",
"version_algo" : "1.0",
"createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona_items": {"key1":"value1", "key2": "value2"} }]
}
I have created following classes to represent the data in the collection
class Persona_Items implements Serializable
{
private int key1;
private String key2;
// Getter/Setter and constructor
}
class Persona implements Serializable
{
String persona_type;
String created_using_algo
String version_algo
long createdAt;
List<Persona_Items> listPersonaItems;
// Getter/setter and constructor
}
class PersonaMetaData implements Serializable
{
long user_id;
boolean persona_created;
long persona_createdAt;
List<Persona> listPersona;
// Getter/setter and constructor
}
and then used it as
// RDD representing the complete collection
JavaPairRDD<Object, BSONObject> bsonRdd = sc.newAPIHadoopRDD(inputConfig,
com.mongodb.hadoop.MongoInputFormat.class,
Object.class, BSONObject.class);
// Get RDD of PersonaMetaData
JavaRDD<PersonaMetaData> metaDataSchemaJavaRDD =
bsonRdd.map(new Function<Tuple2<Object, BSONObject>, PersonaMetaData >() {
@Override
public PersonaMetaData call(Tuple2<Object, BSONObject> objectBSONObjectTuple2)
throws Exception { // Parse the BSON object and return a new PersonaMetaData object }
// Convert into DataFrame
dataFrame= sqlContext.createDataFrame(metaDataSchemaJavaRDD,
PersonaMetaData.class);
Exception
scala.MatchError: io.abc.spark.schema.PersonaMetaData @31ff5060 (of class io.abc.spark.schema.PersonaMetaData ) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:169) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:500) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:498)
Not having any list in the class runs without any issues.
Upvotes: 2
Views: 2195
Reputation: 330193
As it is clearly stated in Inferring the Schema Using Reflection section of Spark SQL, DataFrames and Datasets Guide
Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays.
Upvotes: 2