Reputation: 591
I'm testing some prototype application. We have json data with nested fields. I'm trying to pull some field using following json and code:
Feed: {name: "test",[Record: {id: 1 AllColumns: {ColA: "1",ColB: "2"}}...]}
Dataset<Row> completeRecord = sparkSession.read().json(inputPath);
final Dataset<Row> feed = completeRecord.select(completeRecord.col("Feed.Record.AllColumns"));
I have around 2000 files with such records. I have tested some files individually and they are working fine. But for some file I am getting below error on second line:
org.apache.spark.sql.AnalysisException: Can't extract value from Feed#8.Record: need struct type but got string;
I'm not sure what is going on here. But I would like to either handle this error gracefully and log which file has that record. Also, is there any way to ignore this and continue with rest of the files?
Upvotes: 4
Views: 6216
Reputation: 591
Answering my own question based on what I have learned. There are couple of ways to solve it. Spark provides options to ignore corrupt files and corrupt records.
To ignore corrupt files one can set following flag to true:
spark.sql.files.ignoreCorruptFiles=true
For more fine grained control and to ignore bad records instead of ignoring the complete file. You can use one of three modes that Spark api provides.
According to DataFrameReader api
mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing. PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a new field configured by columnNameOfCorruptRecord. When a schema is set by user, it sets null for extra fields.
DROPMALFORMED : ignores the whole corrupted records.
FAILFAST : throws an exception when it meets corrupted records.
PERMISSIVE mode worked really well for me but when I provided my own schema Spark filled missing attributes with null instead of marking it corrupt record.
Upvotes: 7
Reputation: 14895
The exception says that one of the json files differs in its structure and that the path Feed.Record.AllColumns
does not exist in this specific file.
Based on this method
private boolean pathExists(Dataset<Row> df, String path) {
try {
df.apply(path);
return true;
}
catch(Exception ex){
return false;
}
}
you can decide if you execute the select
or log an error message:
if(pathExists(completeRecord, "Feed.Record.AllColumns") {
final Dataset<Row> feed = completeRecord.select(completeRecord.col("Feed.Record.AllColumns"));
//continue with processing
}
else {
//log error message
}
Upvotes: 2