Boris
Boris

Reputation: 886

why is databricks autoloader failing to merge new columns with schema evolution

I'm using databricks autoloader to load parquet files

def run_autoloader(table_name, checkpoint_path, latest_file_location, new_columns):
# Configure Auto Loader to ingest parquet data to a Delta table
  (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load(latest_file_location)
    .toDF(*new_columns)
    .select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
    .writeStream
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", "true")
    .trigger(once=True)
    .toTable(table_name))

While reading the stream, I have to rename the columns because some of them have problematic characters; hence the parameter new_columns above. This is how I change the column names:

def set_new_schema(latest_file_location):
## Read file for schema and rename columns (remove special characters)    
    df = spark.read.option("header", "true").option("recursiveFileLookup", "true").parquet(latest_file_location)
    df = df.withColumn("_rescued_data", lit(""))
    df_schema = df.schema
    new_columns = [col.name.replace(" ", "_").replace("-", "_").replace(".", "_").replace(",", "").replace(";", "").replace("{", "").replace("}", "").replace("(", "").replace(")", "").replace("\n", "").replace("\t", "").replace("=", "") for col in df_schema]
    return new_columns

The stream crashes as expected, but for some reason it doesn't create a new schema. So when the job retries, I get the same error again. If I don't rename the columns with this step: .toDF(*new_columns), the schema evolution works, so for some reason this step is breaking the stream, but I can't figure out why.

Has anyone had to deal with this issue before? If so, could you suggest a workaround for changing the column names before writing the stream?

Upvotes: 0

Views: 10

Answers (0)

Related Questions