Reputation: 381
Using autoloader, I am reading some continues data from storage to Databricks Delta Live table. The declaration of data pipeline is as follows.
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
sch = "StructType([StructField('Date', StringType(), True), StructField('machine', StringType(), True), StructField('temperature', DecimalType(), True), StructField('time', StringType(), True)])"
@dlt.create_table(
comment="The raw machine data, ingested from azure storage.",
table_properties={
"myCompanyPipeline.quality": "raw",
"pipelines.autoOptimize.managed": "true"
}
)
def test_raw():
return (spark.readStream.format("cloudFiles").option("schema",sch).option("cloudFiles.schemaLocation", "/FileStore/schema").option("cloudFiles.format", "json").load("..../"))
And dataset I am reading from storage as below.
{"Date":"2023-10-16","time":"12:00:00","machine":"Machine1","temperature":"23.50"}
{"Date":"2023-10-16","time":"12:00:01","machine":"Machine2","temperature":"...corrupt temp..."}
{"Date":"2023-10-16","time":"12:00:02","machine":"Machine3","temperature":"27.50"}
But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully. Ideally this should get failed because temperature column is defined as Decimal data type.
Can someone please help, why this schema enforcement not working.
Upvotes: 1
Views: 1017
Reputation: 8160
But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully. Ideally this should get failed because temperature column is defined as Decimal data type.
You won't get error whenever there is mismatch between schema and data type, it simply makes it has null
when there is type mismatch.
Note: It makes null
only when you are running pipeline first time. If you already having table with column of different type and provided schema is of different type then you will get error as below.
For json
file type it takes everything as string if you don't provide the schema properly.
That is you provided schema in option("schema",sch)
instead of schema(sch)
So, you won't get any error and it takes everything as string.
Upvotes: 0
Reputation: 381
The problem has been resolved after applying
spark.readStream.format("cloudFiles").schema(sch)
in place of
spark.readStream.format("cloudFiles").option("schema",sch)
Upvotes: 1