Reputation: 505
I am new to Spark. Need help in understanding how spark works.
Suppose README.md
is stored in HDFS 128 blocks at 3 nodes and i am using spark shell to process it.
val textFile = sc.textFile("README.md")
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark.first()
In above case execution will be triggered by line 3.
Will Spark load complete 3 splits of README.md
in HDFS node's RAM and then filter the linesWithSpark
and keep in memory for a moment. And send the first line from linesWithSpark
(from 1st split)?
Or it will just pull the first line with "Spark" from Split1 of HDFS Node's disk and send it to driver.
What changes to processing if i change the 2nd line to
val linesWithSpark = textFile.filter(line => line.contains("Spark")).cache()
Upvotes: 2
Views: 729
Reputation: 330073
Lets start with a simple experiment. First lets load data and check its distribution:
val textFile = sc.textFile("README.md", 2)
textFile.glom.map(_.size).collect
// Array[Int] = Array(54, 41)
As we can suspect simple filter
generates only a single task:
textFile.filter(line => line.contains("Spark")).toDebugString
// String =
// (2) MapPartitionsRDD[11] at filter at <console>:30 []
// | MapPartitionsRDD[8] at textFile at <console>:27 []
// | README.md HadoopRDD[7] at textFile at <console>:27 []
Now lets run this job with out cache
and collect some diagnostic information:
val cnt = sc.accumulator(0L, "cnt")
val linesWithSpark = textFile.filter(line => {
cnt += 1L
line.contains("Spark")
})
linesWithSpark.first()
// String = # Apache Spark
cnt.value
/// Long = 1
As you can see job without cache will process only a single record. It happens because first
is executed as a take(1)
. In the first iteration take
runs the job only on a one partition and uses it.take(left)
, where left
is equal one, on its iterator.
Since Iterators
are lazy our program returns immediately after processing the first line. If the first partition doesn't provide required results take
iterates increasing number the processed partition with each iteration.
Next lets repeat the same experiment with with cache:
val cacheCntCache = sc.accumulator(0L, "cacheCnt")
val linesWithSparkCached = textFile.filter(line => {
cacheCntCache += 1L
line.contains("Spark")
}).cache()
linesWithSparkCached.first()
// String = # Apache Spark
cacheCntCache.value
// Long = 54
Moreover lets check the storage info:
sc.getRDDStorageInfo
// Array[org.apache.spark.storage.RDDInfo] = Array(
// RDD "MapPartitionsRDD" (12)
// StorageLevel: StorageLevel(false, true, false, true, 1);
// CachedPartitions: 1; TotalPartitions: 2; MemorySize: 1768.0 B;
// ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)
As you can see in case cache Spark will finish to process the partition and cache it in memory. While I cannot provide exact part of the source which is responsible for this behavior it looks like a reasonable optimization. Since partition is already loaded there is no reason to stop the job.
See also: Lazy foreach on a Spark RDD
Upvotes: 1