Reputation: 11
I want to write a dataset/dataframe to a csv after performing several transformations(union) to the original dataset/dataframe. The dataset/dataframe obtained is displayed perfectly without any problem but when I want to write the dataset/dataframe in the csv it shows the following error
Caused by: java.io.FileNotFoundException: File file:/../file.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
If I don't do the transformations to the original dataset/dataframe(union), no exception is thrown and the dataset/dataframe is written to the csv perfectly.
I have tried to refresh the table but no table exists in my database. I suppose I should create one but I don't know how to do it,I have received several errors when creating it.
spark.catalog.createTable("newTable", "data/temporaryBasis")
This is my code to read the dataset/dataframe and union them:
var data= spark.read.option("header","true").option("inferSchema","true").csv("data/dataset/mammography_id.csv")
.drop("ID")
var dataTemporary = spark.read.option("header","true").option("inferSchema","true").csv("data/temporaryBasis")
.drop("ID")
for(d<-dataTemporary.columns)
if(d.contains("_bin"))
dataTemporary=dataTemporary.drop(d)
data = dataTemporary.union(data).withColumn("ID",monotonically_increasing_id())
and this is my code to write the dataset/dataframe:
val result=data
result.withColumn("features", stringify(result.col("features")))
.write
.mode(SaveMode.Overwrite)
.option("header","true")
.csv("data/temporaryBasis")
I would appreciate the help :)
Upvotes: 1
Views: 1308
Reputation: 733
This issue happens because of spark DAG. on any action
operation spark reads this DAG, tries to optimise it and finally runs it.
The problem here is you are trying to read from a path and write it on that path that spark loads data from it.
In this case you have 2 solutions:
dataTemporary
dataframe:val rawDF: Dataset[Row] = spark.read.parquet(DATASET_PATH).cache()
val transformedDF: Dataset[Row] = rawDF.transform(...)
transformedDF.write.mode("overwrite").parquet(DATASET_PATH)
val rawDF: Dataset[Row] = spark.read.parquet(DATASET_PATH)
val transformedDF: Dataset[Row] = rawDF.transform(...)
transformedDF.write.mode("overwrite").parquet(TEMP_PATH)
Filesystem.delete(DATA_SET_PATH) # DUMMY CODE
Filesystem.move(TEMP_PATH, DATASET_PATH) # DUMMY CODE
Upvotes: 1