Joseph
Joseph

Reputation: 2245

Spark SQL: Nested classes to parquet error

I can't seem to write to parquet a JavaRDD<T> where T is a say, Person class. I've defined it as

public class Person implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String name;
    private String age;
    private Address address;
....

with Address:

public class Address implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String City; private String Block;
    ...<getters and setters>

I then create a JavaRDD like so:

JavaRDD<Person> people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function<String, Person>()
    {
        public Person call(String line)
        {
            String[] parts = line.split(",");
            Person person = new Person();
            person.setName(parts[0]);
            person.setAge("2");
            Address address = new Address("HomeAdd","141H");
            person.setAddress(address);
            return person;
        }
    });

Note - I am manually setting Address the same for all. This is basically a nested RDD. On trying to save this as a parquet file:

DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class);
dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet");    

Address class is:

import java.io.Serializable;
public class Address implements Serializable
{
    public Address(String city, String block)
    {
        super();
        City = city;
        Block = block;
    }
    private static final long serialVersionUID = 1L;
    private String City;
    private String Block;
    //Omitting getters and setters
}

I encounter the error:

Caused by: java.lang.ClassCastException: com.test.schema.Address cannot be cast to org.apache.spark.sql.Row

I am running spark-1.4.1.

So what gives? How can I read a complex data structure from a text file and save as parquet? Seems I cannot do so.

Upvotes: 3

Views: 4238

Answers (1)

Igor Berman
Igor Berman

Reputation: 1532

You are using java api that has limitation

from spark documentation: http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds

Spark SQL supports automatically converting an RDD of JavaBeans into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields. with scala case classes it will work(updated to write to parquet format)

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

case class Address(city:String, block:String);
case class Person(name:String,age:String, address:Address);
object Test2 {
  def main(args: Array[String]): Unit = {

     val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
      val sc = new SparkContext(conf)
      val sqlContext = new org.apache.spark.sql.SQLContext(sc);
      import sqlContext.implicits._
      val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d"))));

      val df  = sqlContext.createDataFrame(people);
      df.write.mode("overwrite").parquet("/tmp/people.parquet")
  }
}

Upvotes: 3

Related Questions