Reputation: 342
I am probably missing out on some basic Spark concept. I am trying to convert an RDD of Integers to a comma separated String. Currently I am doing it by collecting the RDD as List and using its Iterator. However, on profiling the JVM, it seems that it brings all the work in a single thread which does not look efficient. Hence, I am trying to call the forEach method on RDD itself, but it behaving strangely. Below is my Unit Test
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3));
StringBuilder sb = new StringBuilder("");
rdd.foreach(t->{
System.out.println(String.valueOf(t));
if(sb.length() > 0)
sb.append(",");
sb.append(String.valueOf(t));
System.out.println(sb);
});
System.out.println(sb.length());
the output:
1
3
2
2
3
1
0
Apparently the StringBuilder gets re-instantiated at each call. Is there another way to do this?
Upvotes: 0
Views: 538
Reputation: 184
You can do it using mapPartitions also. So in that way for each partition you will work in parallel and then collect them together at end.
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7),5) // This will have six number of partitions
val rdd3 = rdd1.mapPartitions(x => {
val str = x.mkString(",")
List(str).iterator
}) // Here we are creating a comma separated string for each partitions only if it has some elements in it
val test1 = rdd3.collect.filterNot(x => {
x.equals("")
}) // filterNot is required as the number of partitions can be more than the number of elements in the sequence( based on spark.default.parallelism property). So some partitions with no elements will generate "" strings.
For Java you can try below code -
JavaRDD<Integer> rdd1 = jsc.parallelize(list);
JavaRDD<String> collection = rdd1.mapPartitions((Iterator<Integer> iter) -> {
ArrayList<String> out = new ArrayList<String>();
StringBuffer strbf = new StringBuffer("");
while(iter.hasNext()) {
Integer current = iter.next();
strbf.append(current);
}
out.add(strbf.toString());
return out.iterator();
});
StringBuffer strbfFinal = new StringBuffer("");
collection.collect().forEach(item -> {
if(!"".equals(item)){
strbfFinal.append(item);
}
});
The StringBuffer has your appened list of numbers.
Upvotes: 1
Reputation: 5700
Since forEach
does return Unit/void
in spark, you need to relay on some centralized thing. in this case we can think of accumulators
. Accumulators are meant for numerical values so we need to build our own Accumulator of String
.
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def addInPlace(accum: String, current: String): String = {
s"accum $current"
}
def zero(initialValue: String): String = {
""
}
}
Then use the accumulator to colelct your value.
val sc = prepareConfig()
val acc = sc.accumulator("")(StringAccumulator)
val baseRDD = sc.parallelize(Seq(1, 2, 3))
baseRDD.foreach { x => acc.++=(x.toString()) }
println(acc.value)
Result : 1 2 3
Solution in Scala.
Upvotes: 0