Reputation: 951
I have a simple Delta Live Tables pipeline that performs a streaming read of multiple csv files from cloudFiles (s3 storage) into a delta table published to the hive metastore.
I have two requirements that make my situation more complex/unique:
skipRows
parameter to autoLoader due to the format of the csv files. This necessitates using the preview channel of the Databricks runtime (v11.3 at the time of writing). sourcecolumnMapping.mode
property to name
as the csv data has characters in column names that are not allowed natively by Delta / Parquet. sourceBoth of the above seem to be preview/beta features, so perhaps the behavior I'm observing is a bug.
My pipeline is defined as follows:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
s3_url = "s3://<path_to_csvs>"
@dlt.table(
comment="...",
table_properties={
'delta.minReaderVersion' : '2',
'delta.minWriterVersion' : '5',
'delta.columnMapping.mode' : 'name',
'quality': 'bronze'
}
)
def bronze_my_csv_data_raw():
return (
spark.readStream.format("cloudFiles")
.option("skipRows", 1)
.option("header", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("pathGlobFilter", "*.csv")
.load(s3_url)
)
This works as intended when the pipeline is set and run for the first time, but even without making any code changes, re-running a "Full refresh all" of the pipeline (to refresh all the data), gives the following error:
com.databricks.sql.transaction.tahoe.DeltaColumnMappingUnsupportedException:
Schema change is detected:
old schema:
root
new schema:
root
|-- TIMESTAMP: string (nullable = true)
|-- RECORD: string (nullable = true)
|-- Samples_Max: string (nullable = true)
...
Schema changes are not allowed during the change of column mapping mode.
This happens even if I change the target table name to create a fresh, empty table. Once it has happened, the same error occurs even in a regular (not full refresh) run.
Any help would be appreciated
Upvotes: 1
Views: 3348
Reputation: 31
I had exactly the same problem. But the only special characters in my column names were spaces, so I just replaced them:
@dlt.table(name=table_name)
def table():
df = spark.readStream.format("cloudFiles").options(**cloud_files_options).load(source)
# Replace all spaces with underscores
df = df.select([col(c).alias(c.replace(" ", "_")) for c in df.columns])
return df
This is obviously not ideal because my raw table is no longer "raw", but at least I can recompute my entire pipeline from scratch now.
EDIT - don't forget to remove columnMapping.mode
from your table properties if you decide to go down this route.
Upvotes: 1