Hooberd
Hooberd

Reputation: 173

Spark SQL - reading csv with schema

I encountered the problem while trying to use Spark for simple reading a CSV file. After such operation I would like to ensure that:

That's the code I use and have problems with:

val schema = Encoders.product[T].schema
val df = spark.read
 .schema(schema)
 .option("header", "true")
 .csv(fileName)

The type T is of type Product, i. e. case class. This works but it doesn't check if column names are correct, so I can give another file and as long as the data types are correct the error doesn't occur and I am unaware that the user provided the wrong file but by some coincidence with correct data types with the proper ordering.

I tried to use options which infers the schema and then use .as[T] method on Dataset, but in case when any column other than String contains only null it's interpreted by Spark as String column, but in my schema it is Integer. So cast exception occurs, but column names has been checked all right.

To summarize: I found solution that I can ensure correct data types but no headers and other solution that I can validate headers but have problems with data types. Is there any way to achieve both, i. e. complete validation of headers and types?

I am using Spark 2.2.0.

Upvotes: 8

Views: 9999

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37832

Looks like you'll have to do it yourself by reading the file header twice.

Looking at Spark's code, the inferred header is completely ignored (never actually read) if a user supplies their own schema, so there's no way of making Spark fail on such an inconsistency.

To perform this comparison yourself:

val schema = Encoders.product[T].schema

// read the actual schema; This shouldn't be too expensive as Spark's
// laziness would avoid actually reading the entire file 
val fileSchema = spark.read
  .option("header", "true")
  .csv("test.csv").schema

// read the file using your own schema. You can later use this DF
val df = spark.read.schema(schema)
  .option("header", "true")
  .csv("test.csv")

// compare actual and expected column names:
val badColumnNames = fileSchema.fields.map(_.name)
  .zip(schema.fields.map(_.name))
  .filter { case (actual, expected) => actual != expected }

// fail if any inconsistency found:
assert(badColumnNames.isEmpty, 
  s"file schema does not match expected; Bad column names: ${badColumnNames.mkString("; ")}")

Upvotes: 9

Related Questions