TJB
TJB

Reputation: 3846

Error while performing collect action on Spark RDD

I am new to Spark, and just installed it using brew. In an iPython notebook, I created an RDD that's just a list of strings. I run some transformations on it, one being a mapping function that makes all items in the list a tuple, and the other being a reduceByKey function.

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
                       .map(lambda w: (w, 1))
                       .reduceByKey(lambda x,y: x+y)
                       .collect())
print(wordCountsCollected)

Everything works until I run collect on it. I get this Traceback.

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-46-88b98f07e38a> in <module>()
      4 wordCountsCollected = (wordsRDD
      5                        .map(lambda w: (w, 1))
----> 6                        .reduceByKey(lambda x,y: x+y)
      7                        .collect())
      8 print(wordCountsCollected)

/usr/local/opt/apache-spark/libexec/python/pyspark/rdd.py in collect(self)
    807         """
    808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    810         return list(_load_from_socket(port, self._jrdd_deserializer))
    811 

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 36.0 failed 1 times, most recent failure: Lost task 2.0 in stage 36.0 (TID 97, localhost, executor driver): java.lang.VerifyError: Bad instruction
Exception Details:
  Location:
    org/apache/spark/storage/ShuffleIndexBlockId.shuffleId()I @4: <illegal>
  Reason:
    Error exists in the bytecode
  Bytecode:
    0x0000000: 2ab4 0029 ec                           

    at org.apache.spark.shuffle.IndexShuffleBlockResolver.getIndexFile(IndexShuffleBlockResolver.scala:59)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:141)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor53.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.VerifyError: Bad instruction
Exception Details:
  Location:
    org/apache/spark/storage/ShuffleIndexBlockId.shuffleId()I @4: <illegal>
  Reason:
    Error exists in the bytecode
  Bytecode:
    0x0000000: 2ab4 0029 ec                           

    at org.apache.spark.shuffle.IndexShuffleBlockResolver.getIndexFile(IndexShuffleBlockResolver.scala:59)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:141)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

iPython Notebook Imports

%pylab inline
import pandas as pd
import seaborn as sns
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)

import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

Simple command in spark-shell

Input >>> scala> spark.range(10).show
Output >>> 
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+

Upvotes: 1

Views: 2988

Answers (1)

TJB
TJB

Reputation: 3846

I figured out the issue to my problem. I was running a tutorial on iPython notebook, and it must be running on python2.7, a previous version of python that I had mainly running on my computer. I should've been tipped off by all the print statements that used the old syntax "print x" instead of print(x). Anyway an important library for using Apache Spark on iPython notebooks, findspark, was installed on python3.5, but not on python2.7, and after installing it on the older python version everything was copacetic. Moral of the story, Always Check Your Python Version :)! Thanks for your help everyone.

Upvotes: 1

Related Questions