PatPanda
PatPanda

Reputation: 5000

Apache Spark: How to "cache" a dataset so it is not re-computed for next computation

Small question regarding Apache Spark please.

I have a very simple piece of Spark job: (Here written in Java, but applicable to other languages)

final SparkSession     sparkSession       = SparkSession.builder().getOrCreate();
final Dataset<Row>     someVeryBigDataSet = sparkSession.read().format("org.apache.spark.sql.cassandra").options(properties).load();
final Dataset<Integer> integerDataSet     = someVeryBigDataSet.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT());
final Dataset<Integer> goodIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger == 0);
final Dataset<Integer> badIntegerDataSet  = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger != 0);
LOGGER.info("good integer dataset size and bad integer dataset size:\n" + goodIntegerDataSet.count() + " " + badIntegerDataSet.count());
sparkSession.stop();

The job is very simple:

  1. Extract a very big dataset from some big data table
  2. Convert each rows into an integer. For this I use a very heavy computation, and this operation should only be performed once.
  3. Separate the good integer result from the bad integer of step 2, display the count

The issue is, I see the map method in step 2, being performed multiple times, for each and every row of the database.

My theory (please correct me if I am wrong), it is computed a first time in line 3, during the map function.

But at line 4 and line 5, in both filter functions, when we need the count, the pipeline needs the result of step 2 again.

As the map function should only be run once, how to avoid this please?

Thank you

Upvotes: 0

Views: 1603

Answers (2)

swapnil shashank
swapnil shashank

Reputation: 987

Both persist() and cache() are the Spark optimization technique, used to store the data, but only difference is cache() method by default stores the data in-memory (MEMORY_ONLY) whereas in persist() method developer can define the storage level to in-memory or in-disk.

#cache DF to store data in MEMORY_ONLY

df.cache()

To check whether the dataframe is cached or not, we can use df.is_cached or df.storageLevel.useMemory. Both the methods will return a bool value as True or False.

#persist dataframe with default storage-level

df.persist()

#persist dataframe with MEMORY_AND_DISK_2

df.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)

Upvotes: 0

Vincent Doba
Vincent Doba

Reputation: 5068

You can use .cache() method when creating integerDataSet:

final Dataset<Integer> integerDataSet = someVeryBigDataSet
  .map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT())
  .cache();

It will persist your dataframe in memory, or if not enough space, in disk and every time you will call this dataframe, the persisted one will be loaded, without recomputing.

More details in caching strategy : https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/

Upvotes: 1

Related Questions