Reputation: 21
from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local").setAppName("my App")
sc=SparkContext(conf=conf)
lines = sc.textFile("C:/Users/user/Downloads/learning-spark-master/learning-spark-master/README.md")
pythonLines = lines.filter(lambda line: "Python" in line)
pythonLines
pythonLines.first()
I am new to pyspark. I was trying to execute above code and I am getting following error after executing pythonLines(). Any help would be appreciated.
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3) (LAPTOP-GAN836TE.fios-router.home executor driver): org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174) ... 14 more
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174) ... 14 more
Upvotes: 2
Views: 27625
Reputation: 1267
this error getting because of pyspark and python connection not established correctly.
Solution:
set PYSPARK_PYTHON
environment variable to python
.
Note: make sure to restart your cmd or shell after adding environment variables
Upvotes: 1
Reputation: 893
I ran into the same error in Chapter 7 in the "Data Science on GCP" book by author Valliappa Lakshmanan.
The author points this out in one of the logistic_regression.ipynb
cells by writing "if this is empty, change the shard you are using", but it's not clear that above error could be an indication of that.
Following their tip, simply change
inputs = 'gs://{}/flights/tzcorr/all_flights-00000-*'.format(BUCKET)
to something like (note the 1 instead of the 0 to select a different shard)
inputs = 'gs://{}/flights/tzcorr/all_flights-00001-*'.format(BUCKET)
You'd have to make an equivalent change further down to not test the model on the same data as you trained it on.
Upvotes: 0
Reputation: 2334
Based on the code , am not seeing anything wrong . Still you can analysis this issue based on the following data related .
Make sure 4th line lines
rdd has the data based on the collect()
.
make your after filter line #5 , you are not getting empty rdd
by using of isEmpty()
. ref : link
Same code I have ran for your reference as sample.
Upvotes: 0