Ziv Gabovitch
Ziv Gabovitch

Reputation: 775

Applying a schema to a Spark's Dataset of a java object

There's a similar issue here: How to add a schema to a Dataset in Spark?

However the issue I'm facing is that I have an already predefined Dataset<Obj1> and I want to define a schema to match its data-members. The end goal is to be able to join between two java objects.

Sample code:

Dataset<Row> rowDataset = spark.getSpark().sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset<MyObj> objResult = rowDataset.map((MapFunction<Row, MyObj>) row ->
        new MyObj(
                row.getInt(row.fieldIndex("field1")),
                row.isNullAt(row.fieldIndex("field2")) ? "" : row.getString(row.fieldIndex("field2")),
                row.isNullAt(row.fieldIndex("field3")) ? "" : row.getString(row.fieldIndex("field3")),
                row.isNullAt(row.fieldIndex("field4")) ? "" : row.getString(row.fieldIndex("field4"))
        ), Encoders.javaSerialization(MyObj.class));

If I'm printing the schema of the row dataset I get the schema as expected:

rowDataset.printSchema();

root
 |-- field1: integer (nullable = false)
 |-- field2: string (nullable = false)
 |-- field3: string (nullable = false)
 |-- field4: string (nullable = false)

If I'm printing the Object dataset I'm losing the actual schema

objResult.printSchema();

root
 |-- value: binary (nullable = true)

The question is how can I apply a schema for the Dataset<MyObj>?

Upvotes: 1

Views: 4042

Answers (1)

Rahul Sharma
Rahul Sharma

Reputation: 5834

Below is the code snippet, I tried and spark behaves as expected, it seems the rootcause of your problem is something else not the map function.

 SparkSession session = SparkSession.builder().config(conf).getOrCreate();
        Dataset<Row> ds = session.read().text("<some path>");
        Encoder<Employee> employeeEncode = Encoders.bean(Employee.class);
        ds.map(new MapFunction<Row, Employee>() {
            @Override
            public Employee call(Row value) throws Exception {
                return new Employee(value.getString(0).split(","));
            }
        }, employeeEncode).printSchema();

Output:

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

//Employee Bean

public class Employee {
    public String name;
    public Integer age;
    public Employee(){

    }
    public Employee(String [] args){
        this.name=args[0];
        this.age=Integer.parseInt(args[1]);
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

Upvotes: 2

Related Questions