S. K
S. K

Reputation: 505

whether spark loads base RDD in Memory

I am new to Spark. Need help in understanding how spark works. Suppose README.md is stored in HDFS 128 blocks at 3 nodes and i am using spark shell to process it.

val textFile = sc.textFile("README.md")
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark.first()

In above case execution will be triggered by line 3.

Will Spark load complete 3 splits of README.mdin HDFS node's RAM and then filter the linesWithSpark and keep in memory for a moment. And send the first line from linesWithSpark (from 1st split)? Or it will just pull the first line with "Spark" from Split1 of HDFS Node's disk and send it to driver.

What changes to processing if i change the 2nd line to

val linesWithSpark = textFile.filter(line => line.contains("Spark")).cache()

Upvotes: 2

Views: 729

Answers (1)

zero323
zero323

Reputation: 330073

Lets start with a simple experiment. First lets load data and check its distribution:

val textFile = sc.textFile("README.md", 2)
textFile.glom.map(_.size).collect
// Array[Int] = Array(54, 41)

As we can suspect simple filter generates only a single task:

textFile.filter(line => line.contains("Spark")).toDebugString
// String = 
// (2) MapPartitionsRDD[11] at filter at <console>:30 []
//  |  MapPartitionsRDD[8] at textFile at <console>:27 []
//  |  README.md HadoopRDD[7] at textFile at <console>:27 []

Now lets run this job with out cache and collect some diagnostic information:

val cnt = sc.accumulator(0L, "cnt")

val linesWithSpark = textFile.filter(line => {
  cnt += 1L
  line.contains("Spark")
})

linesWithSpark.first()
// String = # Apache Spark
cnt.value
/// Long = 1

As you can see job without cache will process only a single record. It happens because first is executed as a take(1). In the first iteration take runs the job only on a one partition and uses it.take(left), where left is equal one, on its iterator.

Since Iterators are lazy our program returns immediately after processing the first line. If the first partition doesn't provide required results take iterates increasing number the processed partition with each iteration.

Next lets repeat the same experiment with with cache:

val cacheCntCache = sc.accumulator(0L, "cacheCnt")

val linesWithSparkCached = textFile.filter(line => {
  cacheCntCache  += 1L
  line.contains("Spark")
}).cache()

linesWithSparkCached.first()
// String = # Apache Spark
cacheCntCache.value
// Long = 54

Moreover lets check the storage info:

sc.getRDDStorageInfo
// Array[org.apache.spark.storage.RDDInfo] = Array(
//   RDD "MapPartitionsRDD" (12)
//   StorageLevel: StorageLevel(false, true, false, true, 1); 
//   CachedPartitions: 1; TotalPartitions: 2; MemorySize: 1768.0 B; 
//   ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)

As you can see in case cache Spark will finish to process the partition and cache it in memory. While I cannot provide exact part of the source which is responsible for this behavior it looks like a reasonable optimization. Since partition is already loaded there is no reason to stop the job.

See also: Lazy foreach on a Spark RDD

Upvotes: 1

Related Questions