Reputation: 173
I encountered the problem while trying to use Spark for simple reading a CSV file. After such operation I would like to ensure that:
That's the code I use and have problems with:
val schema = Encoders.product[T].schema
val df = spark.read
.schema(schema)
.option("header", "true")
.csv(fileName)
The type T
is of type Product
, i. e. case class. This works but it doesn't check if column names are correct, so I can give another file and as long as the data types are correct the error doesn't occur and I am unaware that the user provided the wrong file but by some coincidence with correct data types with the proper ordering.
I tried to use options which infers the schema and then use .as[T]
method on Dataset, but in case when any column other than String
contains only null it's interpreted by Spark as String
column, but in my schema it is Integer
. So cast exception occurs, but column names has been checked all right.
To summarize: I found solution that I can ensure correct data types but no headers and other solution that I can validate headers but have problems with data types. Is there any way to achieve both, i. e. complete validation of headers and types?
I am using Spark 2.2.0.
Upvotes: 8
Views: 9999
Reputation: 37832
Looks like you'll have to do it yourself by reading the file header twice.
Looking at Spark's code, the inferred header is completely ignored (never actually read) if a user supplies their own schema, so there's no way of making Spark fail on such an inconsistency.
To perform this comparison yourself:
val schema = Encoders.product[T].schema
// read the actual schema; This shouldn't be too expensive as Spark's
// laziness would avoid actually reading the entire file
val fileSchema = spark.read
.option("header", "true")
.csv("test.csv").schema
// read the file using your own schema. You can later use this DF
val df = spark.read.schema(schema)
.option("header", "true")
.csv("test.csv")
// compare actual and expected column names:
val badColumnNames = fileSchema.fields.map(_.name)
.zip(schema.fields.map(_.name))
.filter { case (actual, expected) => actual != expected }
// fail if any inconsistency found:
assert(badColumnNames.isEmpty,
s"file schema does not match expected; Bad column names: ${badColumnNames.mkString("; ")}")
Upvotes: 9