Reputation: 29
I have the following clss in scala shell in spark.
class StringSplit(val query:String)
{
def getStrSplit(rdd:RDD[String]):RDD[String]={
rdd.map(x=>x.split(query))
}
}
I am trying to call the method in this class like
val inputRDD=sc.parallelize(List("one","two","three"))
val strSplit=new StringSplit(",")
strSplit.getStrSplit(inputRDD)
-> This steps fails with error:getStrSplit is not a member of StringSplit error.
Can you please let me know what is wrong with this?
Upvotes: 1
Views: 687
Reputation: 27433
It seems like a reasonable thing to do, but...
getStrSplit
is wrong because .split
returns Array[String]
not String
Another way:
val input = sc.parallelize(List("1,2,3,4","5,6,7,8"))
input: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>
The test input here is a list of two strings that each require some comma splitting to get to the data.
To parse input by splitting can be as easy as:
val parsedInput = input.map(_.split(","))
parsedInput: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:25
Here _.split(",")
is an anonymous function with one parameter _
, where Scala infers the types from the other calls rather than the types being explicitly defined.
Notice the type is RDD[Array[String]]
not RDD[String]
We could extract the 3rd element of each line with
parsedInput.map(_(2)).collect()
res27: Array[String] = Array(3, 7)
So how about the original question, doing the same operation in a class. I tried:
class StringSplit(query:String){
def get(rdd:RDD[String]) = rdd.map(_.split(query));
}
val ss = StringSplit(",");
ss.get(input);
---> org.apache.spark.SparkException: Task not serializable
I'm guessing that occurs because the class is not serialized to each worker, rather Spark tries to send split function but it has a parameter that is not also sent.
scala> class commaSplitter {
def get(rdd:RDD[String])=rdd.map(_.split(","));
}
defined class commaSplitter
scala> val cs = new commaSplitter;
cs: commaSplitter = $iwC$$iwC$commaSplitter@262f1580
scala> cs.get(input);
res29: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[23] at map at <console>:10
scala> cs.get(input).collect()
res30: Array[Array[String]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8))
This parameter-free class works.
EDIT
You can tell scala you want your class to be serializable by extends Serializable
like so:
scala> class stringSplitter(s:String) extends Serializable {
def get(rdd:RDD[String]) = rdd.map(_.split(s));
}
defined class stringSplitter
scala> val ss = new stringSplitter(",");
ss: stringSplitter = $iwC$$iwC$stringSplitter@2a33abcd
scala> ss.get(input)
res33: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[25] at map at <console>:10
scala> ss.get(input).collect()
res34: Array[Array[String]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8))
and this works.
Upvotes: 1