Reputation: 12740
I have the following spark simple example:
#1 val lines: RDD[String] = sc.textFile("/data/non_existing_file.txt")
#2 val words: RDD[String] = lines.flatMap(line => line.split(" "))
#3 val pairs: RDD[(String, Int)] = words.map(word => (word, 1))
#4 val counts: RDD[(String, Int)] = pairs.reduceByKey(_ + _)
#5 counts.saveAsTextFile("/tmp/result")
When I run the program, I get an exception Input path does not exist: file:/data/non_existing_file.txt"
, as expected.
What is ackward is that I get this exception on line #4. I understand that I don't get this error in line #1, #2 nor #3, because the computation is not executed yet. The computation is executed only at line #5, when I have the action to write the result to a file. So, why I get an exception on line #4, instead of line #5?
Upvotes: 1
Views: 78
Reputation: 330453
This happens under two conditions:
spark.default.parallelism
is not set.Partitioner
nor the number of partitions for reduceByKey
In this case reduceByKey
eagerly creates new HashPartitioner
with the number of partitions equal to the number of partitions of the parent RDD
. To obtain number of partition, it has to compute input splits. This requires presence of file in input filepath which seems to be missing, hence the error.
Actual reduceByKey
operation will be performed only after an action call.
This is very similar problem to Why does sortBy transformation trigger a Spark job?
Upvotes: 1