Reputation: 593
I am new to the Spark framework, and working on some (~)small tasks on my local machine to practice. My task was the following: I have 365 zipped csv files stored in S3, that contain daily logs. I suppose to construct a dataframe of the whole year. My method was to retrieve the keys from the bucket, build daily-dataframes, unify them into month-dataframes, do the same for them, and get a full year dataframe in return.
That worked on some sample data I retrieved for testing. Before I built the DataFrames, I unzipped the files, wrote to disk the uncompressed csv file, and used it to create the DataFrame.
The problem: If I delete the csv file from disk (make the temporary), after I create the dataframe, I can't perform any actions on the dataframe (year_df.count() for instance). throws the Spark.exception:
"Job aborted due to stage failure: .... java.io.FileNotFoundException: File .csv does not exist"
After some search over SO, I found that the reason may be MetaData that Spark uses when applying SQL queries over the DataFrames (External Table not getting updated from parquet files written by spark streaming). I changed the
spark.sql.parquet.cacheMetadata
by running spark = SparkSession.builder.config("spark.sql.parquet.cacheMetadata", "false").getOrCreate()
. Made sure the spark.conf.get("spark.sql.parquet.cacheMetadata")
returned false..
Could not find any solution. Of course, unzipping all the files into S3 will work, but this is not useful for me..
Thanks!
Upvotes: 1
Views: 1325
Reputation: 13430
Cache() is still just a hint; Spark may need to recalculate the values in the presence of a failure, or just the cached data being discarded due to cache pressure. If you want to delete the source data, then make sure you've written your results out, and really will have no need of the data ever again.
I would actually recommend moving off CSV into any columnar format (ORC, Parquet) & compress with Snappy. Much more efficient for processing, especially with predicate push down
Upvotes: 0
Reputation: 16076
Spark performs actions in lazy way.
That means, if you can do few transformations, but file will be read only when you invoke action.
It work in the same way on Datasets as in RDDs, as Datasets are backed by RDDs
Consider code:
val df = sqlContext.read // read file
val query = df.groupBy('x).count()
query.show() // here the data will be read
So if you delete file before it would be read, then Spark will throw an exception. You can force reading by doing some action, i.e. take
or show
. It would be cached if you will do cache()
:
val df = sqlContext.read // read file
val query = df.groupBy('x).count().cache()
query.show() // here the data will be read and cached
Upvotes: 1