Reputation: 409
I tried to merge two files in a Datalake using scala in data bricks and saved it back to the Datalake using the following code:
val df =sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("adl://xxxxxxxx/Test/CSV")
df.coalesce(1).write.
format("com.databricks.spark.csv").
mode("overwrite").
option("header", "true").
save("adl://xxxxxxxx/Test/CSV/final_data.csv")
However the file final_data.csv is saved as a directory instead of a file with multiple files and the actual .csv file is saved as 'part-00000-tid-dddddddddd-xxxxxxxxxx.csv'.
How do I rename this file so that I can move it to another directory?
Upvotes: 3
Views: 27769
Reputation: 21
Rename file stored in ADLS Gen2 in Azure DataBricks:
We can use rename or copy method for this operation. If file starts with part-0000 or end with .csv then we can use logic. Rename : data.csv
from pyspark.sq1.functions import col
source_path ="abfss://[email protected]/sample/final_data/"
new_name="abfss://[email protected]/sample/output/data.csv"
getname = dbutils.fs.ls(source_path)
df_filelist = spark.createDataFrame(getname)
filename = df_filelist.filter (col ("name").like("%.csv%")) .select ("name") .collect)[0][0]
old_name = source_path +'/'+filename
dbutils.fs.mv (old_name, new_name)
dbutils.fs.rm(source_path+'/‘,True)
Upvotes: 1
Reputation: 1
Python
y = "dbfs:/mnt/myFirstMountPoint/apltperf/Shiv/Destination"
df = sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load(x+"/")
df.repartition(1).write.format("csv").mode("overwrite").save(y+"/"+"final_data.csv")
spark.conf.set('x', str(x)) spark.conf.set('y', str(y))
Scala
var x=spark.conf.get("x")
var y=spark.conf.get("y") dbutils.fs.ls(x).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(y+"/"+"final_data.csv").filter(file=>file.name.startsWith("part-00000"))(0).path,y+"/"+"data.csv")
dbutils.fs.rm(y+"/"+"final_data.csv",true)
Upvotes: -1
Reputation: 409
Got it. It can be renamed and placed into another destination using the following code. Also current files that were merged will be deleted.
val x = "Source"
val y = "Destination"
val df = sqlContext.read.format("csv")
.option("header", "true").option("inferSchema", "true")
.load(x+"/")
df.repartition(1).write.
format("csv").
mode("overwrite").
option("header", "true").
save(y+"/"+"final_data.csv")
dbutils.fs.ls(x).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(y+"/"+"final_data.csv").filter(file=>file.name.startsWith("part-00000"))(0).path,y+"/"+"data.csv")
dbutils.fs.rm(y+"/"+"final_data.csv",true)
Upvotes: 7