Mike
Mike

Reputation: 593

error in applying actions on spark dataframe

I am new to the Spark framework, and working on some (~)small tasks on my local machine to practice. My task was the following: I have 365 zipped csv files stored in S3, that contain daily logs. I suppose to construct a dataframe of the whole year. My method was to retrieve the keys from the bucket, build daily-dataframes, unify them into month-dataframes, do the same for them, and get a full year dataframe in return.

That worked on some sample data I retrieved for testing. Before I built the DataFrames, I unzipped the files, wrote to disk the uncompressed csv file, and used it to create the DataFrame.

The problem: If I delete the csv file from disk (make the temporary), after I create the dataframe, I can't perform any actions on the dataframe (year_df.count() for instance). throws the Spark.exception:

"Job aborted due to stage failure: .... java.io.FileNotFoundException: File .csv does not exist"

After some search over SO, I found that the reason may be MetaData that Spark uses when applying SQL queries over the DataFrames (External Table not getting updated from parquet files written by spark streaming). I changed the

spark.sql.parquet.cacheMetadata

by running spark = SparkSession.builder.config("spark.sql.parquet.cacheMetadata", "false").getOrCreate(). Made sure the spark.conf.get("spark.sql.parquet.cacheMetadata") returned false..

Could not find any solution. Of course, unzipping all the files into S3 will work, but this is not useful for me..

Thanks!

Upvotes: 1

Views: 1325

Answers (2)

stevel
stevel

Reputation: 13430

Cache() is still just a hint; Spark may need to recalculate the values in the presence of a failure, or just the cached data being discarded due to cache pressure. If you want to delete the source data, then make sure you've written your results out, and really will have no need of the data ever again.

I would actually recommend moving off CSV into any columnar format (ORC, Parquet) & compress with Snappy. Much more efficient for processing, especially with predicate push down

Upvotes: 0

T. Gawęda
T. Gawęda

Reputation: 16076

Spark performs actions in lazy way.

That means, if you can do few transformations, but file will be read only when you invoke action.

It work in the same way on Datasets as in RDDs, as Datasets are backed by RDDs

Consider code:

val df = sqlContext.read // read file
val query = df.groupBy('x).count()

query.show() // here the data will be read

So if you delete file before it would be read, then Spark will throw an exception. You can force reading by doing some action, i.e. take or show. It would be cached if you will do cache():

val df = sqlContext.read // read file
val query = df.groupBy('x).count().cache()

query.show() // here the data will be read and cached

Upvotes: 1

Related Questions