Reputation: 2551
I've totally new to Spark. I'm using version Spark 2.3 and Python version 3.7. On Windows 10, by the way. I'm launching a Jupyter Notebook to perform the PySpark operations. I'm following a course on Pluralsight (Getting Started with Spark 2.0)
I'm launching pyspark in Jupyter using the below commands in Anaconda Command Prompt:
set PYSPARK_DRIVER_PYTHON=jupyter set PYSPARK_DRIVER_PYTHON_OPTS=notebook pyspark
After the notebook opens up:
I run the below commands:
sc
from pyspark.sql.types import Row
from datetime import datetime
simple_data = sc.parallelize([1, "Alice", 50])
simple_data
simple_data.count()
simple_data.first()
Now, here is where it fails: simple_data.first()
with the below error:
Py4JJavaError Traceback (most recent call last)
<ipython-input-5-cc577dea1d9b> in <module>
----> 1 simple_data.first()
C:\spark\python\pyspark\rdd.py in first(self)
1374 ValueError: RDD is empty
1375 """
-> 1376 rs = self.take(1)
1377 if rs:
1378 return rs[0]
C:\spark\python\pyspark\rdd.py in take(self, num)
1356
1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
1359
1360 items += res
C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
999 # SparkContext#runJob.
1000 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
1003
C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 raise Py4JJavaError(
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
322 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
yield next(iterator)
StopIteration
It's a lot of error log than I pasted here. I've looked up for possible solutions and I've updated Java jdk using conda install -c cyclus java-jdk
but even after that, nothing changed.
I'm kinda stuck and can't proceed with my course. Why does it work for .count()
but not for .first()
How to resolve this error? What am I missing?
Adding full error message after trying out @Sparker0i's suggestion in the answer:
Py4JJavaError Traceback (most recent call last)
<ipython-input-3-4dbbd81a7c5c> in <module>
2 #simple_data
3
----> 4 simple_data = sc.parallelize([[1, "Alice", 50]]).toDF()
5 simple_data.count()
6 simple_data.first()
C:\spark\python\pyspark\sql\session.py in toDF(self, schema, sampleRatio)
56 [Row(name=u'Alice', age=1)]
57 """
---> 58 return sparkSession.createDataFrame(self, schema, sampleRatio)
59
60 RDD.toDF = toDF
C:\spark\python\pyspark\sql\session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
685
686 if isinstance(data, RDD):
--> 687 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
688 else:
689 rdd, schema = self._createFromLocal(map(prepare, data), schema)
C:\spark\python\pyspark\sql\session.py in _createFromRDD(self, rdd, schema, samplingRatio)
382 """
383 if schema is None or isinstance(schema, (list, tuple)):
--> 384 struct = self._inferSchema(rdd, samplingRatio, names=schema)
385 converter = _create_converter(struct)
386 rdd = rdd.map(converter)
C:\spark\python\pyspark\sql\session.py in _inferSchema(self, rdd, samplingRatio, names)
353 :return: :class:`pyspark.sql.types.StructType`
354 """
--> 355 first = rdd.first()
356 if not first:
357 raise ValueError("The first row in RDD is empty, "
C:\spark\python\pyspark\rdd.py in first(self)
1374 ValueError: RDD is empty
1375 """
-> 1376 rs = self.take(1)
1377 if rs:
1378 return rs[0]
C:\spark\python\pyspark\rdd.py in take(self, num)
1356
1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
1359
1360 items += res
C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
999 # SparkContext#runJob.
1000 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
1003
C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 raise Py4JJavaError(
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
322 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
yield next(iterator)
StopIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
File "C:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\python\pyspark\rdd.py", line 1354, in takeUpToNumLeft
yield next(iterator)
StopIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
File "C:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Upvotes: 1
Views: 2275
Reputation: 1861
You might want to do:
simple_data = sc.parallelize([[1, "Alice", 50]]).toDF()
simple_data.count()
simple_data.first()
simple_data.show()
Notice the change inside parallelize
.
Upvotes: 1