Reputation: 2827
I have a nice function let allows me to overwrite and rename a file when I save the results of query to ADLS, see following
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = mssparkutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
mssparkutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process
I usually use this function with Databricks in which I would dbutils instead of mssparkutils.
Anyhow as an example I would the above function with the following code:
df_staging_ccd_probate = df_staging_ccd_probate = "abfss://[email protected]/RAW/LANDING/"
myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filename")
mssparkutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)
With Databricks this would work fine, but with Apache Spark with Azure Synapse I get the following error:
Py4JJavaError: An error occurred while calling z:mssparkutils.fs.mv.
: org.apache.hadoop.fs.PathExistsException: `abfss://[email protected]/RAW/LANDING/filename.parquet': File exists
For some reason the 'overwrite' method doesn't seem to work with Apache Spark in Synapse.
Can someone let me know what the equivalent method is to 'overwrite'? Or am I missing something? Thanks
Just so you know
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = dbutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
dbutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process
with the following overwrites everytime with Databricks, so it must be something with Synapse that it doesn't work:
myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filemane")
dbutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)
Upvotes: 1
Views: 2445
Reputation: 46
You were close. This is how to move the file and allow overwriting.
mssparkutils.fs.mv(source_path, dest_path, overwrite=True)
Upvotes: 3