Reputation: 11
Running windows 8.1, Java 1.8, Scala 2.10.5, Spark 1.4.1, Scala IDE (Eclipse 4.4), Ipython 3.0.0 and Jupyter Scala.
I'm relatively new to Scala and Spark and I'm seeing an issue where certain RDD commands like collect and first return the "Task not serializable" error. What's unusual to me is I see that error in Ipython notebooks with the Scala kernel or the Scala IDE. However when I run the code directly in the spark-shell I do not receive this error.
I would like to setup these two environments for more advanced code evaluation beyond the shell. I have little expertise in troubleshooting this type of issue and determining what to look for; if you can provide guidance on how to get started with resolving this kind of issue that would be greatly appreciated.
Code:
val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data"
val sample = sc.parallelize(sc.textFile(logFile).take(100).map(line => line.replace("'","").replace("\"","")).map(line => line.substring(0,line.length()-1)))
val header = sample.first
val data = sample.filter(_!= header)
data.take(1)
data.count
data.collect
Stack Trace
org.apache.spark.SparkException: Task not serializable
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
org.apache.spark.rdd.RDD.filter(RDD.scala:310)
cmd49$$user$$anonfun$4.apply(Main.scala:188)
cmd49$$user$$anonfun$4.apply(Main.scala:187)
java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
- object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@5976e363)
- field (class: cmd12$$user, name: conf, type: class org.apache.spark.SparkConf)
- object (class cmd12$$user, cmd12$$user@39a7edac)
- field (class: cmd49, name: $ref$cmd12, type: class cmd12$$user)
- object (class cmd49, cmd49@3c2a0c4f)
- field (class: cmd49$$user, name: $outer, type: class cmd49)
- object (class cmd49$$user, cmd49$$user@774ea026)
- field (class: cmd49$$user$$anonfun$4, name: $outer, type: class cmd49$$user)
- object (class cmd49$$user$$anonfun$4, <function0>)
- field (class: cmd49$$user$$anonfun$4$$anonfun$apply$3, name: $outer, type: class cmd49$$user$$anonfun$4)
- object (class cmd49$$user$$anonfun$4$$anonfun$apply$3, <function1>)
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
org.apache.spark.rdd.RDD.filter(RDD.scala:310)
cmd49$$user$$anonfun$4.apply(Main.scala:188)
cmd49$$user$$anonfun$4.apply(Main.scala:187)
Upvotes: 1
Views: 207
Reputation: 40370
@Ashalynd was right about the fact that sc.textFile already creates and RDD. You don't need sc.parallelize in that case. documentation here
So considering your example, this is what you'll need to do :
// Read your data from S3
val logFile = "s3n://[key:[key secret]@mortar-example-data/airline-data"
val rawRDD = sc.textFile(logFile)
// Fetch the header
val header = rawRDD.first
// Filter on the header than map to clean the line
val sample = rawRDD.filter(!_.contains(header)).map {
line => line.replaceAll("['\"]","").substring(0,line.length()-1)
}.takeSample(false,100,12L) // takeSample returns a fixed-size sampled subset of this RDD in an array
It's better to use the takeSample
function :
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
withReplacement : whether sampling is done with replacement
num : size of the returned sample
seed : seed for the random number generator
Note 1 : the sample is an Array[String], so if you wish to transform it to an RDD, you can use the parallelize
function as followed :
val sampleRDD = sc.parallelize(sample.toSeq)
Note 2 : If you wish to take a sample RDD directly from your rawRDD.filter(...).map(...)
, you can use the sample
function that returns an RDD[T]. Nevertheless, you'll need to specify an fraction of the data you need instead of a specific number.
Upvotes: 1
Reputation: 12563
sc.textFile already creates distributed dataset (check the documentation ). You don't need sc.parallelize in that case, but - as eliasah properly noted - you need to turn the result into an RDD again, if you want to have an RDD.
val selection = sc.textFile(logFile). // RDD
take(100). // collection
map(_.replaceAll("['\"]",""). // use regex to match both chars
map(_.init) // a method that returns all elements except the last
// turn the resulting collection into RDD again
val sample = sc.parallelize(selection)
Upvotes: 0