LearneR
LearneR

Reputation: 2551

'rdd.count()' works but 'rdd.first()' fails with Py4JJava Error in Jupyter notebook

I've totally new to Spark. I'm using version Spark 2.3 and Python version 3.7. On Windows 10, by the way. I'm launching a Jupyter Notebook to perform the PySpark operations. I'm following a course on Pluralsight (Getting Started with Spark 2.0)

I'm launching pyspark in Jupyter using the below commands in Anaconda Command Prompt:

set PYSPARK_DRIVER_PYTHON=jupyter set PYSPARK_DRIVER_PYTHON_OPTS=notebook pyspark

After the notebook opens up:

I run the below commands:

sc

from pyspark.sql.types import Row
from datetime import datetime

simple_data = sc.parallelize([1, "Alice", 50])
simple_data

simple_data.count()

simple_data.first()

Now, here is where it fails: simple_data.first() with the below error:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-cc577dea1d9b> in <module>
----> 1 simple_data.first()

    C:\spark\python\pyspark\rdd.py in first(self)
   1374         ValueError: RDD is empty
   1375         """
-> 1376         rs = self.take(1)
   1377         if rs:
   1378             return rs[0]

C:\spark\python\pyspark\rdd.py in take(self, num)
   1356 
   1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
   1359 
   1360             items += res

C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    999         # SparkContext#runJob.
   1000         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1001         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1002         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
   1003 

C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

It's a lot of error log than I pasted here. I've looked up for possible solutions and I've updated Java jdk using conda install -c cyclus java-jdk but even after that, nothing changed.

I'm kinda stuck and can't proceed with my course. Why does it work for .count() but not for .first() How to resolve this error? What am I missing?

Adding full error message after trying out @Sparker0i's suggestion in the answer:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-4dbbd81a7c5c> in <module>
      2 #simple_data
      3 
----> 4 simple_data = sc.parallelize([[1, "Alice", 50]]).toDF()
      5 simple_data.count()
      6 simple_data.first()

C:\spark\python\pyspark\sql\session.py in toDF(self, schema, sampleRatio)
     56         [Row(name=u'Alice', age=1)]
     57         """
---> 58         return sparkSession.createDataFrame(self, schema, sampleRatio)
     59 
     60     RDD.toDF = toDF

C:\spark\python\pyspark\sql\session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    685 
    686         if isinstance(data, RDD):
--> 687             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    688         else:
    689             rdd, schema = self._createFromLocal(map(prepare, data), schema)

C:\spark\python\pyspark\sql\session.py in _createFromRDD(self, rdd, schema, samplingRatio)
    382         """
    383         if schema is None or isinstance(schema, (list, tuple)):
--> 384             struct = self._inferSchema(rdd, samplingRatio, names=schema)
    385             converter = _create_converter(struct)
    386             rdd = rdd.map(converter)

C:\spark\python\pyspark\sql\session.py in _inferSchema(self, rdd, samplingRatio, names)
    353         :return: :class:`pyspark.sql.types.StructType`
    354         """
--> 355         first = rdd.first()
    356         if not first:
    357             raise ValueError("The first row in RDD is empty, "

C:\spark\python\pyspark\rdd.py in first(self)
   1374         ValueError: RDD is empty
   1375         """
-> 1376         rs = self.take(1)
   1377         if rs:
   1378             return rs[0]

C:\spark\python\pyspark\rdd.py in take(self, num)
   1356 
   1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
   1359 
   1360             items += res

C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    999         # SparkContext#runJob.
   1000         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1001         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1002         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
   1003 

C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
    at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
  File "C:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
    at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Upvotes: 1

Views: 2275

Answers (1)

Sparker0i
Sparker0i

Reputation: 1861

You might want to do:

simple_data = sc.parallelize([[1, "Alice", 50]]).toDF()
simple_data.count()
simple_data.first()
simple_data.show()

Notice the change inside parallelize.

Upvotes: 1

Related Questions