David Portabella
David Portabella

Reputation: 12740

on spark, some operations are being executed before an action is defined?

I have the following spark simple example:

#1 val lines: RDD[String] = sc.textFile("/data/non_existing_file.txt")
#2 val words: RDD[String] = lines.flatMap(line => line.split(" "))
#3 val pairs: RDD[(String, Int)] = words.map(word => (word, 1))
#4 val counts: RDD[(String, Int)] = pairs.reduceByKey(_ + _)
#5 counts.saveAsTextFile("/tmp/result")

When I run the program, I get an exception Input path does not exist: file:/data/non_existing_file.txt", as expected.

What is ackward is that I get this exception on line #4. I understand that I don't get this error in line #1, #2 nor #3, because the computation is not executed yet. The computation is executed only at line #5, when I have the action to write the result to a file. So, why I get an exception on line #4, instead of line #5?

Upvotes: 1

Views: 78

Answers (1)

zero323
zero323

Reputation: 330453

This happens under two conditions:

  • spark.default.parallelism is not set.
  • You provide neither Partitioner nor the number of partitions for reduceByKey

In this case reduceByKey eagerly creates new HashPartitioner with the number of partitions equal to the number of partitions of the parent RDD. To obtain number of partition, it has to compute input splits. This requires presence of file in input filepath which seems to be missing, hence the error.

Actual reduceByKey operation will be performed only after an action call.

This is very similar problem to Why does sortBy transformation trigger a Spark job?

Upvotes: 1

Related Questions