Piotr L
Piotr L

Reputation: 1105

DLT: commas treated as part of column name

I am trying to create a STREAMING LIVE TABLE object in my DataBricks environment, using an S3 bucket with a bunch of CSV files as a source.

The syntax I am using is:

CREATE OR REFRESH STREAMING LIVE TABLE t1
COMMENT "test table"
 TBLPROPERTIES
 (
   "myCompanyPipeline.quality" = "bronze"
   , 'delta.columnMapping.mode' = 'name'
   , 'delta.minReaderVersion' = '2'
   , 'delta.minWriterVersion' = '5'
 )
AS
SELECT * FROM cloud_files
(
  "/input/t1/"
  ,"csv"
  ,map
   (
    "cloudFiles.inferColumnTypes", "true"
   , "delimiter", ","
   , "header", "true"
   )
)

A sample source file content:

ROW_TS,ROW_KEY,CLASS_ID,EVENT_ID,CREATED_BY,CREATED_ON,UPDATED_BY,UPDATED_ON
31/07/2018 02:29,4c1a985c-0f98-46a6-9703-dd5873febbbb,HFK,XP017,test-user,02/01/2017 23:03,,
17/01/2021 21:40,3be8187e-90de-4d6b-ac32-1001c184d363,HTE,XP083,test-user,02/09/2017 12:01,,
08/11/2019 17:21,05fa881e-6c8d-4242-9db4-9ba486c96fa0,JG8,XP083,test-user,18/05/2018 22:40,,

When I run the associated pipeline, I am getting the following error:

org.apache.spark.sql.AnalysisException: Cannot create a table having a column whose name contains commas in Hive metastore.

For some reason, the loader is not recognizing commas as column separators and is trying to load the whole thing into a single column.

I spent a good few hours already trying to find a solution. Replacing commas with semicolons (both in the source file and in the "delimiter" option) does not help.

Trying to manually upload the same file to a regular (i.e. non-streaming) Databricks table works just fine. The issue is solely with a streaming table.

Ideas?

Upvotes: 1

Views: 421

Answers (1)

Piotr L
Piotr L

Reputation: 1105

Not exactly the type of a solution I would have expected here but it seems to work so...

Rather than using SQL to create a DLT, using Python scripting helps:

import dlt

@dlt.table
def t1():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/input/t1/")
  )

Note that the above script needs to be executed via a DLT pipeline (running it directly from a notebook will throw a ModuleNotFoundError exception)

Upvotes: 1

Related Questions