Pradeep
Pradeep

Reputation: 870

How to create encoder for custom Java objects?

I am using following class to create bean from Spark Encoders

Class OuterClass implements Serializable {
    int id;
    ArrayList<InnerClass> listofInner;
    
    public int getId() {
        return id;
    }
    
    public void setId (int num) {
        this.id = num;
    }

    public ArrayList<InnerClass> getListofInner() {
        return listofInner;
    }
    
    public void setListofInner(ArrayList<InnerClass> list) {
        this.listofInner = list;
    }
}

public static class InnerClass implements Serializable {
    String streetno;
    
    public void setStreetno(String streetno) {
        this.streetno= streetno;
    }

    public String getStreetno() {
        return streetno;
    }
}

Encoder<OuterClass> outerClassEncoder = Encoders.bean(OuterClass.class);
Dataset<OuterClass> ds = spark.createDataset(Collections.singeltonList(outerclassList), outerClassEncoder)

And I am getting the following error

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot infer type for class OuterClass$InnerClass because it is not bean-compliant

How can I implement this type of use case for Spark in Java? This worked fine if I remove the inner class. But I need to have an inner class for my use case.

Upvotes: 6

Views: 19763

Answers (1)

abaghel
abaghel

Reputation: 15307

Your JavaBean class should have a public no-argument constructor, getter and setters and it should implement Serializable interface. Spark SQL works on valid JavaBean class.

EDIT : Adding working sample with inner class

OuterInnerDF.java

package com.abaghel.examples;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.abaghel.examples.OuterClass.InnerClass;

public class OuterInnerDF {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
            .builder()
            .appName("OuterInnerDF")
            .config("spark.sql.warehouse.dir", "/file:C:/temp")
            .master("local[2]")
            .getOrCreate();

        System.out.println("====> Create DataFrame");
        //Outer
        OuterClass us = new OuterClass();
        us.setId(111);      
        //Inner
        OuterClass.InnerClass ic = new OuterClass.InnerClass();
        ic.setStreetno("My Street");
        //list
        ArrayList<InnerClass> ar = new ArrayList<InnerClass>();
        ar.add(ic);      
        us.setListofInner(ar);   
        //DF
        Encoder<OuterClass> outerClassEncoder = Encoders.bean(OuterClass.class);         
        Dataset<OuterClass> ds = spark.createDataset(Collections.singletonList(us), outerClassEncoder);
        ds.show();
    }
}

OuterClass.java

package com.abaghel.examples;

import java.io.Serializable;
import java.util.ArrayList;

public class OuterClass implements Serializable {
    int id;
    ArrayList<InnerClass> listofInner;

    public int getId() {
        return id;
    }

    public void setId(int num) {
        this.id = num;
    }

    public ArrayList<InnerClass> getListofInner() {
        return listofInner;
    }

    public void setListofInner(ArrayList<InnerClass> list) {
        this.listofInner = list;
    }

    public static class InnerClass implements Serializable {
        String streetno;

        public void setStreetno(String streetno) {
            this.streetno = streetno;
        }

        public String getStreetno() {
            return streetno;
        }
    }
}

Console Output

====> Create DataFrame
16/08/28 18:02:55 INFO CodeGenerator: Code generated in 32.516369 ms
+---+-------------+
| id|  listofInner|
+---+-------------+
|111|[[My Street]]|
+---+-------------+

Upvotes: 8

Related Questions