ernitingoel
ernitingoel

Reputation: 661

org.apache.spark.sql.AnalysisException: No such struct field

I am reading a parquet file like this using Java Spark

Dataset<MyData> myDataDS = sparkSession.read().parquet(myParquetFile)
                        .as(Encoders.bean(MyData.class));

It worked fine if myParquetFile schema is exactly according to the class MyData however let's say if I add a new field e.g. myId (even though it's value is null) to MyData class then I need to regenerate the parquet file otherwise it will throw the exception like

Caused by: org.apache.spark.sql.AnalysisException: No such struct field myId

Is there a way I can skip the null values to get pass this error without regenerating the parquet file?

Upvotes: 2

Views: 20288

Answers (2)

Vincent Doba
Vincent Doba

Reputation: 5078

When reading parquet, by default, Spark use the schema contained in the parquet files to read data. As, contrary to Avro format for instance, the schema is in the parquet files, you must regenerate the parquet files if you want to change schema.

However, instead of letting Spark inferring the schema, you can provide the schema to Spark's DataFrameReader by using method .schema(). In this case, Spark will ignore the schema defined in parquet files and use the schema you provided.

So, the solution is to pass the schema extracted from your casting class to Spark's DataFrameReader:

Dataset<MyData> myDataDS = sparkSession.read()
    .schema(Encoders.bean(MyData.class).schema())
    .parquet(myParquetFile)
    .as(Encoders.bean(MyData.class))

The AnalysisException is not thrown and you get a dataset with a column "myId" set to null.

Upvotes: 4

Som
Som

Reputation: 6338

Brute Force approach to solve this -

        Dataset<Row> parquet = spark.read()
                .parquet(
                        getClass().getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
                                ".parquet").getPath()
                );
        parquet.show(false);
        /**
         * +------+
         * |price |
         * +------+
         * |123.15|
         * +------+
         */

        StructType schema = Encoders.bean(MyData.class).schema();
        List<String> columns = Arrays.stream(parquet.columns()).collect(Collectors.toList());
        List<Column> columnList = JavaConverters.asJavaCollectionConverter(schema).asJavaCollection().stream()
                .map(f -> (columns.contains(f.name())) ? col(f.name()) : lit(null).cast(f.dataType()).as(f.name()))
                .collect(Collectors.toList());
        Dataset<MyData> myDataDS =
                parquet.select(JavaConverters.asScalaBufferConverter(columnList).asScala()).as(Encoders.bean(MyData.class));
        myDataDS.show(false);
        myDataDS.printSchema();
        /**
         * +----+------+
         * |myId|price |
         * +----+------+
         * |null|123.15|
         * +----+------+
         *
         * root
         *  |-- myId: string (nullable = true)
         *  |-- price: decimal(5,2) (nullable = true)
         */

MyData.java


public class MyData {
    private double price;
    private String myId;

    public String getMyId() {
        return myId;
    }

    public void setMyId(String myId) {
        this.myId = myId;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

Upvotes: 1

Related Questions