Reputation: 661
I am reading a parquet file like this using Java Spark
Dataset<MyData> myDataDS = sparkSession.read().parquet(myParquetFile)
.as(Encoders.bean(MyData.class));
It worked fine if myParquetFile
schema is exactly according to the class MyData
however let's say if I add a new field e.g. myId
(even though it's value is null) to MyData
class then I need to regenerate the parquet file otherwise it will throw the exception like
Caused by: org.apache.spark.sql.AnalysisException: No such struct field myId
Is there a way I can skip the null values to get pass this error without regenerating the parquet file?
Upvotes: 2
Views: 20288
Reputation: 5078
When reading parquet, by default, Spark use the schema contained in the parquet files to read data. As, contrary to Avro format for instance, the schema is in the parquet files, you must regenerate the parquet files if you want to change schema.
However, instead of letting Spark inferring the schema, you can provide the schema to Spark's DataFrameReader
by using method .schema()
. In this case, Spark will ignore the schema defined in parquet files and use the schema you provided.
So, the solution is to pass the schema extracted from your casting class to Spark's DataFrameReader
:
Dataset<MyData> myDataDS = sparkSession.read()
.schema(Encoders.bean(MyData.class).schema())
.parquet(myParquetFile)
.as(Encoders.bean(MyData.class))
The AnalysisException
is not thrown and you get a dataset with a column "myId" set to null.
Upvotes: 4
Reputation: 6338
Brute Force approach to solve this -
Dataset<Row> parquet = spark.read()
.parquet(
getClass().getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
".parquet").getPath()
);
parquet.show(false);
/**
* +------+
* |price |
* +------+
* |123.15|
* +------+
*/
StructType schema = Encoders.bean(MyData.class).schema();
List<String> columns = Arrays.stream(parquet.columns()).collect(Collectors.toList());
List<Column> columnList = JavaConverters.asJavaCollectionConverter(schema).asJavaCollection().stream()
.map(f -> (columns.contains(f.name())) ? col(f.name()) : lit(null).cast(f.dataType()).as(f.name()))
.collect(Collectors.toList());
Dataset<MyData> myDataDS =
parquet.select(JavaConverters.asScalaBufferConverter(columnList).asScala()).as(Encoders.bean(MyData.class));
myDataDS.show(false);
myDataDS.printSchema();
/**
* +----+------+
* |myId|price |
* +----+------+
* |null|123.15|
* +----+------+
*
* root
* |-- myId: string (nullable = true)
* |-- price: decimal(5,2) (nullable = true)
*/
public class MyData {
private double price;
private String myId;
public String getMyId() {
return myId;
}
public void setMyId(String myId) {
this.myId = myId;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
Upvotes: 1