Reputation: 43
I want to process some parquet files (with snappy compression) using AutoLoader in Databricks. A lot of those files are empty or contain just one record. Also, I cannot change how they are created, nor compact them.
Here are some of the approaches I tried so far:
I have set the following AutoLoader configurations:
I use the following readStream configurations:
spark.readStream.format("cloudFiles")
.options(**CLOUDFILE_CONFIG)
.option("cloudFiles.format", "parquet")
.option("pathGlobFilter", "*.snappy")
.option("recursiveFileLookup", True)
.schema(schema)
.option("locale", "de-DE")
.option("dateFormat", "dd.MM.yyyy")
.option("timestampFormat", "MM/dd/yyyy HH:mm:ss")
.load(<path-to-source>)
And the following writeStream configurations:
df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", <path_to_checkpoint>)
.queryName(<processed_table_name>)
.partitionBy(<partition-key>)
.option("mergeSchema", True)
.trigger(once=True)
.start(<path-to-target>)
My prefered solution would be to use DBX but I don't know why the job is succeeding yet, I only see empty folders in the target location. This is very strange behavior because I think AutoLoader is timing out reading only empty files after some time!
P.S. the same is also happening when I use parquet spark streaming instead of AutoLoader.
Do you know of any reason why this is happening and how can I overcome this issue?
Upvotes: 0
Views: 1161
Reputation: 6588
If you are reading files written in parquet with snappy compression the file extention is '.snappy.parquet'. You can try to change the pathGlobFilter to match *.snappy.parquet. My second doubt is regarding cloudFiles.allowOverwrites": True , you may give it a try without this option.
Upvotes: 0
Reputation: 33
Are you specifying the schema of the streaming read? (Sorry, can't add comments yet)
Upvotes: 0