user482963
user482963

Reputation: 342

Spark RDD.forEach re-initializing external object

I am probably missing out on some basic Spark concept. I am trying to convert an RDD of Integers to a comma separated String. Currently I am doing it by collecting the RDD as List and using its Iterator. However, on profiling the JVM, it seems that it brings all the work in a single thread which does not look efficient. Hence, I am trying to call the forEach method on RDD itself, but it behaving strangely. Below is my Unit Test

JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3));

        StringBuilder sb = new StringBuilder("");

        rdd.foreach(t->{

            System.out.println(String.valueOf(t));

            if(sb.length() > 0)
                sb.append(",");         
            sb.append(String.valueOf(t));

            System.out.println(sb); 

        });

        System.out.println(sb.length());

the output:

1
3
2
2
3
1
0

Apparently the StringBuilder gets re-instantiated at each call. Is there another way to do this?

Upvotes: 0

Views: 538

Answers (2)

SagarKhandagale
SagarKhandagale

Reputation: 184

You can do it using mapPartitions also. So in that way for each partition you will work in parallel and then collect them together at end.

val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7),5) // This will have six number of partitions

val rdd3 = rdd1.mapPartitions(x => {
   val str = x.mkString(",")
  List(str).iterator
}) // Here we are creating a comma separated string for each partitions only if it has some elements in it

val test1 = rdd3.collect.filterNot(x => {
  x.equals("")
}) // filterNot is required as the number of partitions can be more than the number of elements in the sequence( based on spark.default.parallelism property). So some partitions with no elements will generate "" strings. 

For Java you can try below code -

JavaRDD<Integer> rdd1 = jsc.parallelize(list);  

JavaRDD<String> collection = rdd1.mapPartitions((Iterator<Integer> iter) -> {
        ArrayList<String> out = new ArrayList<String>();
        StringBuffer strbf = new StringBuffer("");

        while(iter.hasNext()) {
            Integer current = iter.next();
            strbf.append(current);
        }

        out.add(strbf.toString());
        return out.iterator();
    });

StringBuffer strbfFinal = new StringBuffer("");

    collection.collect().forEach(item -> {
        if(!"".equals(item)){
            strbfFinal.append(item);
        }
    });

The StringBuffer has your appened list of numbers.

Upvotes: 1

Balaji Reddy
Balaji Reddy

Reputation: 5700

Since forEach does return Unit/void in spark, you need to relay on some centralized thing. in this case we can think of accumulators. Accumulators are meant for numerical values so we need to build our own Accumulator of String.

import org.apache.spark.AccumulatorParam
    object StringAccumulator extends AccumulatorParam[String] {

      def addInPlace(accum: String, current: String): String = {
        s"accum $current"
      }

      def zero(initialValue: String): String = {
        ""
      }
    }

Then use the accumulator to colelct your value.

val sc = prepareConfig()

val acc = sc.accumulator("")(StringAccumulator)

val baseRDD = sc.parallelize(Seq(1, 2, 3))

baseRDD.foreach { x => acc.++=(x.toString()) }
println(acc.value)

Result : 1 2 3

Solution in Scala.

Upvotes: 0

Related Questions