Saravanan
Saravanan

Reputation: 29

Spark fails while calling scala class method to comma split strings

I have the following clss in scala shell in spark.

class StringSplit(val query:String)
{
 def getStrSplit(rdd:RDD[String]):RDD[String]={
 rdd.map(x=>x.split(query))
}
}

I am trying to call the method in this class like

val inputRDD=sc.parallelize(List("one","two","three"))
val strSplit=new StringSplit(",")
strSplit.getStrSplit(inputRDD)

-> This steps fails with error:getStrSplit is not a member of StringSplit error.

Can you please let me know what is wrong with this?

Upvotes: 1

Views: 687

Answers (1)

Paul
Paul

Reputation: 27433

It seems like a reasonable thing to do, but...

  • the result type for getStrSplit is wrong because .split returns Array[String] not String
  • parallelizing List("one","two","three") results in "one", "two" and "three" being stored, and there are no strings needing a comma split.

Another way:

val input = sc.parallelize(List("1,2,3,4","5,6,7,8"))
input: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>

The test input here is a list of two strings that each require some comma splitting to get to the data.

To parse input by splitting can be as easy as:

val parsedInput = input.map(_.split(","))
parsedInput: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:25

Here _.split(",") is an anonymous function with one parameter _, where Scala infers the types from the other calls rather than the types being explicitly defined.

Notice the type is RDD[Array[String]] not RDD[String]

We could extract the 3rd element of each line with

parsedInput.map(_(2)).collect()
res27: Array[String] = Array(3, 7)

So how about the original question, doing the same operation in a class. I tried:

class StringSplit(query:String){
  def get(rdd:RDD[String]) = rdd.map(_.split(query)); 
}

val ss = StringSplit(",");

ss.get(input);
--->  org.apache.spark.SparkException: Task not serializable

I'm guessing that occurs because the class is not serialized to each worker, rather Spark tries to send split function but it has a parameter that is not also sent.

scala> class commaSplitter {
     def get(rdd:RDD[String])=rdd.map(_.split(","));
     }
defined class commaSplitter

scala> val cs = new commaSplitter;
cs: commaSplitter = $iwC$$iwC$commaSplitter@262f1580

scala> cs.get(input);
res29: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[23] at map at <console>:10

scala> cs.get(input).collect()
res30: Array[Array[String]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8))

This parameter-free class works.

EDIT

You can tell scala you want your class to be serializable by extends Serializable like so:

scala> class stringSplitter(s:String) extends Serializable {
     def get(rdd:RDD[String]) = rdd.map(_.split(s)); 
     }
defined class stringSplitter

scala> val ss = new stringSplitter(",");
ss: stringSplitter = $iwC$$iwC$stringSplitter@2a33abcd

scala> ss.get(input)
res33: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[25] at map at <console>:10

scala> ss.get(input).collect()
res34: Array[Array[String]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8))

and this works.

Upvotes: 1

Related Questions