Reputation: 886
I'm using databricks autoloader to load parquet files
def run_autoloader(table_name, checkpoint_path, latest_file_location, new_columns):
# Configure Auto Loader to ingest parquet data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load(latest_file_location)
.toDF(*new_columns)
.select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.trigger(once=True)
.toTable(table_name))
While reading the stream, I have to rename the columns because some of them have problematic characters; hence the parameter new_columns above. This is how I change the column names:
def set_new_schema(latest_file_location):
## Read file for schema and rename columns (remove special characters)
df = spark.read.option("header", "true").option("recursiveFileLookup", "true").parquet(latest_file_location)
df = df.withColumn("_rescued_data", lit(""))
df_schema = df.schema
new_columns = [col.name.replace(" ", "_").replace("-", "_").replace(".", "_").replace(",", "").replace(";", "").replace("{", "").replace("}", "").replace("(", "").replace(")", "").replace("\n", "").replace("\t", "").replace("=", "") for col in df_schema]
return new_columns
The stream crashes as expected, but for some reason it doesn't create a new schema. So when the job retries, I get the same error again. If I don't rename the columns with this step: .toDF(*new_columns), the schema evolution works, so for some reason this step is breaking the stream, but I can't figure out why.
Has anyone had to deal with this issue before? If so, could you suggest a workaround for changing the column names before writing the stream?
Upvotes: 0
Views: 10