Reputation: 5000
Small question regarding Apache Spark please.
I have a very simple piece of Spark job: (Here written in Java, but applicable to other languages)
final SparkSession sparkSession = SparkSession.builder().getOrCreate();
final Dataset<Row> someVeryBigDataSet = sparkSession.read().format("org.apache.spark.sql.cassandra").options(properties).load();
final Dataset<Integer> integerDataSet = someVeryBigDataSet.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT());
final Dataset<Integer> goodIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger == 0);
final Dataset<Integer> badIntegerDataSet = integerDataSet.filter((FilterFunction<Integer>) oneInteger -> oneInteger != 0);
LOGGER.info("good integer dataset size and bad integer dataset size:\n" + goodIntegerDataSet.count() + " " + badIntegerDataSet.count());
sparkSession.stop();
The job is very simple:
The issue is, I see the map method in step 2, being performed multiple times, for each and every row of the database.
My theory (please correct me if I am wrong), it is computed a first time in line 3, during the map function.
But at line 4 and line 5, in both filter functions, when we need the count, the pipeline needs the result of step 2 again.
As the map function should only be run once, how to avoid this please?
Thank you
Upvotes: 0
Views: 1603
Reputation: 987
Both persist() and cache() are the Spark optimization technique, used to store the data, but only difference is cache() method by default stores the data in-memory (MEMORY_ONLY) whereas in persist() method developer can define the storage level to in-memory or in-disk.
#cache DF to store data in MEMORY_ONLY
df.cache()
To check whether the dataframe is cached or not, we can use df.is_cached or df.storageLevel.useMemory. Both the methods will return a bool value as True or False.
#persist dataframe with default storage-level
df.persist()
#persist dataframe with MEMORY_AND_DISK_2
df.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
Upvotes: 0
Reputation: 5068
You can use .cache()
method when creating integerDataSet
:
final Dataset<Integer> integerDataSet = someVeryBigDataSet
.map((MapFunction<Row, Integer>) row -> someSuperComplexAndHeavyComputationThatShouldBeDoneOnlyOnceToConvertRowToInteger(row), Encoders.INT())
.cache();
It will persist your dataframe in memory, or if not enough space, in disk and every time you will call this dataframe, the persisted one will be loaded, without recomputing.
More details in caching strategy : https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/
Upvotes: 1