Reputation: 2245
I can't seem to write to parquet a JavaRDD<T>
where T is a say, Person
class. I've defined it as
public class Person implements Serializable
{
private static final long serialVersionUID = 1L;
private String name;
private String age;
private Address address;
....
with Address
:
public class Address implements Serializable
{
private static final long serialVersionUID = 1L;
private String City; private String Block;
...<getters and setters>
I then create a JavaRDD
like so:
JavaRDD<Person> people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function<String, Person>()
{
public Person call(String line)
{
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge("2");
Address address = new Address("HomeAdd","141H");
person.setAddress(address);
return person;
}
});
Note - I am manually setting Address
the same for all. This is basically a nested RDD. On trying to save this as a parquet file:
DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class);
dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet");
Address class is:
import java.io.Serializable;
public class Address implements Serializable
{
public Address(String city, String block)
{
super();
City = city;
Block = block;
}
private static final long serialVersionUID = 1L;
private String City;
private String Block;
//Omitting getters and setters
}
I encounter the error:
Caused by: java.lang.ClassCastException: com.test.schema.Address cannot be cast to org.apache.spark.sql.Row
I am running spark-1.4.1.
DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable");
I still get the same errorSo what gives? How can I read a complex data structure from a text file and save as parquet? Seems I cannot do so.
Upvotes: 3
Views: 4238
Reputation: 1532
You are using java api that has limitation
from spark documentation: http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds
Spark SQL supports automatically converting an RDD of JavaBeans into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields. with scala case classes it will work(updated to write to parquet format)
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
case class Address(city:String, block:String);
case class Person(name:String,age:String, address:Address);
object Test2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._
val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d"))));
val df = sqlContext.createDataFrame(people);
df.write.mode("overwrite").parquet("/tmp/people.parquet")
}
}
Upvotes: 3