Kieran
Kieran

Reputation: 951

Schema Changes not Allowed on Delta Live Tables Full Refresh

I have a simple Delta Live Tables pipeline that performs a streaming read of multiple csv files from cloudFiles (s3 storage) into a delta table published to the hive metastore.

I have two requirements that make my situation more complex/unique:

  1. I need to use the skipRows parameter to autoLoader due to the format of the csv files. This necessitates using the preview channel of the Databricks runtime (v11.3 at the time of writing). source
  2. I need to set the table columnMapping.mode property to name as the csv data has characters in column names that are not allowed natively by Delta / Parquet. source

Both of the above seem to be preview/beta features, so perhaps the behavior I'm observing is a bug.

My pipeline is defined as follows:

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

s3_url = "s3://<path_to_csvs>"

@dlt.table(
    comment="...",
    table_properties={
        'delta.minReaderVersion' : '2', 
        'delta.minWriterVersion' : '5', 
        'delta.columnMapping.mode' : 'name',
        'quality': 'bronze'
    }
)
def bronze_my_csv_data_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("skipRows", 1)
      .option("header", "true")
      .option("cloudFiles.includeExistingFiles", "true")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
      .option("pathGlobFilter", "*.csv")
      .load(s3_url)
  )

This works as intended when the pipeline is set and run for the first time, but even without making any code changes, re-running a "Full refresh all" of the pipeline (to refresh all the data), gives the following error:

com.databricks.sql.transaction.tahoe.DeltaColumnMappingUnsupportedException: 
Schema change is detected:

old schema:
root


new schema:
root
 |-- TIMESTAMP: string (nullable = true)
 |-- RECORD: string (nullable = true)
 |-- Samples_Max: string (nullable = true)
 ...

Schema changes are not allowed during the change of column mapping mode.

This happens even if I change the target table name to create a fresh, empty table. Once it has happened, the same error occurs even in a regular (not full refresh) run.

Any help would be appreciated

Upvotes: 1

Views: 3348

Answers (1)

idiocache
idiocache

Reputation: 31

I had exactly the same problem. But the only special characters in my column names were spaces, so I just replaced them:

@dlt.table(name=table_name)
def table():
    df = spark.readStream.format("cloudFiles").options(**cloud_files_options).load(source)

    # Replace all spaces with underscores
    df = df.select([col(c).alias(c.replace(" ", "_")) for c in df.columns])

    return df

This is obviously not ideal because my raw table is no longer "raw", but at least I can recompute my entire pipeline from scratch now.

EDIT - don't forget to remove columnMapping.mode from your table properties if you decide to go down this route.

Upvotes: 1

Related Questions