Reputation: 120
I have a running program which does a bunch of transformations on an input RDD which contains an array of Integers. But after all the transformations are done, I want to just find the sum of all those numbers and get only 1 integer and store that value in an output file.
I am able to get the final integer value by performing the reduce
operation on the RDD. But then, after this ACTION is performed, I get an integer returned to me (and not a RDD) so I cannot use the RDD.saveAsTextFile("/output_location")
method.
Also, it gives me an error when I try to create an RDD for that integer:
// finalvalue is my variable that contains the output value to be stored.
val out = sc.parallelize(finalvalue);
The error: Type Mismatch, expected Seq[NotInferedT], actual: Int
Can someone explain to me why can't I store one single value into a RDD or do I have to convert it into an Array?
Upvotes: 2
Views: 3500
Reputation: 509
So, let us consider the scenario and understand one step at a time:
// Creating the RDD named x which stores 3 integers
scala> val x = sc.parallelize(Array(1,2,3))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:12
// Now calculating the sum converting it into an Integer and storing in mysum variable
scala> val mysum = x.sum().toInt
[Stage 1:> (0 + 0) / 2]mysum: Int = 6
So far so good. Now when you try to create the RDD just using the sc.parallelize() on the mysum, following happens:
scala> val sumRDD = sc.parallelize(mysum)
<console>:16: error: type mismatch;
found : Int
required: Seq[?]
Error occurred in an application involving default arguments.
val sumRDD = sc.parallelize(mysum)
^
As the error suggests, you can only provide a Sequence as an argument to the parallelize method. So solve this you either need to pass an Array (or Seq, etc.) or a string to the parallelize:
// So either do this
scala> val sumRDD = sc.parallelize(mysum.toString)
sumRDD: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[4] at parallelize at <console>:16
// or do this
scala> val sumRDD = sc.parallelize(Array(mysum))
sumRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:16
Next step is to store it to a particular path. 'file' is to store it on the local machine and 'hdfs' to store on HDFS, respectively
scala> sumRDD.saveAsTextFile("file:///home/hdfs/a12")
15/08/19 16:14:57 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
And a directory is created at the above mentioned path with part files in it.
Upvotes: 2
Reputation: 27455
Basically the situation is that you have an Int
and you want to write it to a file. Your first thought is to create a distributed collection across a cluster of machines, that only contains this Int
and let those machines write the Int
to a set of files in a distributed way.
I'd argue this is not the right approach. Do not use Spark for saving an Int
into a file. Instead you can use a PrintWriter
:
val out = new java.io.PrintWriter("filename.txt")
out.println(finalvalue)
out.close()
This will only work for writing to a local file. If you want to write to HDFS it gets more complicated.
Upvotes: 4