srinivas amara
srinivas amara

Reputation: 155

Getting error on getting the sequence file content with rdd.collect() where as with rdd.foreach(println) not getting error in spark

I am trying to save some data into sequence file and again read the same data and trying to print it as a small exercise. I am able to see the data when i use rdd.foreach(println) but getting error when i do rdd.collect(). What exactly will happen in both the cases. In second case error is "object not serializable". I am running these commands in Spark REPL. My spark version is 1.5.1

Sequence files saving:-

val data = sc.parallelize(List(("Hadoop", 1), ("Spark", 2), ("Kafka", 3)))
data.saveAsSequenceFile("file:///home/user/outputfiles/seqFileDemo1")

=================

Sequence files reading:-

import org.apache.hadoop.io.{Text, IntWritable} 
val data1 = sc.sequenceFile("file:///home/user/outputfiles/seqFileDemo1", classOf[Text], classOf[IntWritable])
data1.foreach(println)

Output:

("Hadoop", 1), 
("Spark", 2), 
("Kafka", 3)




scala> data1.collect()

Output:

17/02/14 16:56:33 INFO SparkContext: Starting job: collect at <console>:19
17/02/14 16:56:33 INFO DAGScheduler: Got job 5 (collect at <console>:19) with 2 output partitions
17/02/14 16:56:33 INFO DAGScheduler: Final stage: ResultStage 5(collect at <console>:19)
17/02/14 16:56:33 INFO DAGScheduler: Parents of final stage: List()
17/02/14 16:56:33 INFO DAGScheduler: Missing parents: List()
17/02/14 16:56:33 INFO DAGScheduler: Submitting ResultStage 5 (file:///home/srini/Desktop/SeqFileData/ HadoopRDD[3] at sequenceFile at <console>:16), which has no missing parents
17/02/14 16:56:33 INFO MemoryStore: ensureFreeSpace(2440) called with curMem=694040, maxMem=556038881
17/02/14 16:56:33 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 2.4 KB, free 529.6 MB)
17/02/14 16:56:33 INFO MemoryStore: ensureFreeSpace(1448) called with curMem=696480, maxMem=556038881
17/02/14 16:56:33 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1448.0 B, free 529.6 MB)
17/02/14 16:56:33 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:60094 (size: 1448.0 B, free: 530.2 MB)
17/02/14 16:56:33 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:861
17/02/14 16:56:33 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 5 (file:///home/srini/Desktop/SeqFileData/ HadoopRDD[3] at sequenceFile at <console>:16)
17/02/14 16:56:33 INFO TaskSchedulerImpl: Adding task set 5.0 with 2 tasks
17/02/14 16:56:33 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 10, localhost, PROCESS_LOCAL, 2156 bytes)
17/02/14 16:56:33 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID 11, localhost, PROCESS_LOCAL, 2156 bytes)
17/02/14 16:56:33 INFO Executor: Running task 1.0 in stage 5.0 (TID 11)
17/02/14 16:56:33 INFO Executor: Running task 0.0 in stage 5.0 (TID 10)
17/02/14 16:56:33 INFO HadoopRDD: Input split: file:/home/srini/Desktop/SeqFileData/part-00000:0+104
17/02/14 16:56:33 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 11)
java.io.NotSerializableException: org.apache.hadoop.io.Text
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (Hadoop,1))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 1)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
17/02/14 16:56:33 INFO HadoopRDD: Input split: file:/home/srini/Desktop/SeqFileData/part-00001:0+121
17/02/14 16:56:33 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 10)
java.io.NotSerializableException: org.apache.hadoop.io.Text
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.Text, value: Kafka)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (Kafka,3))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 2)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
17/02/14 16:56:33 ERROR TaskSetManager: Task 1.0 in stage 5.0 (TID 11) had a not serializable result: org.apache.hadoop.io.Text
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (Hadoop,1))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 1); not retrying
17/02/14 16:56:33 INFO TaskSchedulerImpl: Cancelling stage 5
17/02/14 16:56:33 INFO TaskSchedulerImpl: Stage 5 was cancelled
17/02/14 16:56:33 INFO DAGScheduler: ResultStage 5 (collect at <console>:19) failed in 0.022 s
17/02/14 16:56:33 INFO DAGScheduler: Job 5 failed: collect at <console>:19, took 0.083701 s
17/02/14 16:56:33 ERROR TaskSetManager: Task 0.0 in stage 5.0 (TID 10) had a not serializable result: org.apache.hadoop.io.Text
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.Text, value: Kafka)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (Kafka,3))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 2); not retrying
17/02/14 16:56:33 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 5.0 (TID 11) had a not serializable result: org.apache.hadoop.io.Text
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (Hadoop,1))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 1)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:19)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
    at $iwC$$iwC$$iwC.<init>(<console>:28)
    at $iwC$$iwC.<init>(<console>:30)
    at $iwC.<init>(<console>:32)
    at <init>(<console>:34)
    at .<init>(<console>:38)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Upvotes: 0

Views: 923

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

The problem is that Text and IntWritable aren't serializable. When you call collect(), it tries to serialize all the data and send it back to the driver. To make that work, you'll first need to extract Text and IntWritable to String and Int respectively which can be serialized:

val result =
  sc
   .sequenceFile[Text, IntWritable]("file:///home/user/outputfiles/seqFileDemo1")
   .map(tup => (tup._1.toString, tup._2.get()))
   .collect()

Upvotes: 3

Related Questions