Boris
Boris

Reputation: 886

Why is schemaEvolution not working in databricks autoloader?

I'm reading csv files and processing them daily so I can append the data to my bronze layer in databricks using autolader. The code looks like this:

    def run_autoloader(table_name, checkpoint_path, latest_file_location, new_columns):
# Configure Auto Loader to ingest parquet data to a Delta table
  (spark.readStream
    .format("cloudFiles")
    #.schema(df_schema)
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load(latest_file_location)
    .toDF(*new_columns)
    .select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
    .writeStream
    .option("checkpointLocation", checkpoint_path)
    .trigger(once=True)
    .option("mergeSchema", "true")
    .toTable(table_name))

Previously this was able to handle evolving schemas, but today after the introduction of a new column in the input csv's I got the following error:

 requirement failed: The number of columns doesn't match.

I've read some posts suggesting editing the schema manually or resetting the schema by deleting the schema checkpoint path, but one would require manual maintenance and the other would mean we have to wipe all our bronze data so for now neither is an option, especially if it's only a temporary fix.

I don't understand why this suddenly started happening as this is specifically what the autoloader was designed to do.

Any help would be much appreciated.

Upvotes: 0

Views: 56

Answers (1)

jhhhn12
jhhhn12

Reputation: 26

Can you clarify in your question if you are attempting to read parquet or csv? In the code snippet you provided you are specifying the format as parquet .option("cloudFiles.format", "parquet"). If you are trying to read csv files using autoloader you should specify the format as csv.

  1. For CSV files, you need to set cloudFiles.inferColumnTypes to true if you want to infer the column datatypes. its default by false as specified in the documentation link below.
  2. Double check checkpoint_path contains the inferred schema information and the checkpoint information.

referencing this documentation

(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.inferColumnTypes", "true") # check docs for explanation
.load(latest_file_location)
.toDF(*new_columns)
.select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.option("mergeSchema", "true")
.toTable(table_name))

Upvotes: 0

Related Questions