amulya349
amulya349

Reputation: 1238

Pyspark error when I do an action on Dataframe which I created manually

I have a python list of strings. I created a dataframe out of it with one column with this code:

skills_df = spark.createDataFrame(temp, StringType())

where, temp is the list of strings. This step was successfully executed.

When I try to do any action on skills_df like skills_df.count(), it gives me an error. It happens with this dataframe. But, not with a dataframe which I had created by importing csv file i.e. csv_df = spark.read.csv('/user/turing/Profiles_final.csv', header=True). I ran this using spark-submit. While to debug, I ran the same code in pyspark, I got the same error. But, when I did a csv_df.count(), even after the error occurred, it ran fine. Please help me with this error. Following is the stacktrace:

18/04/26 07:05:10 WARN org.apache.spark.scheduler.TaskSetManager: Stage 14 contains a task of very large size (215 KB). The maximum recommended task size is 100 KB.
18/04/26 07:05:11 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 2.0 in stage 14.0 (TID 658, spark-w-1.c.amulya.internal, executor 2): java.io.IOException: Cannot run program "/opt/conda/bin/python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 33 more

18/04/26 07:05:11 ERROR org.apache.spark.scheduler.TaskSetManager: Task 2 in stage 14.0 failed 4 times; aborting job
18/04/26 07:05:11 WARN org.apache.spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
Traceback (most recent call last):
  File "/home/turing/mi/sample_job.py", line 95, in <module>
    skills = processing_methods.get_skills(company, position, company_df)
  File "/home/turing/mi/sample_job.py", line 72, in get_skills
    return skills_df.groupBy('value').count().head(5)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 972, in head
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 476, in take
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 438, in collect
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o134.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 14.0 failed 4 times, most recent failure: Lost task 2.3 in stage 14.0 (TID 667, spark-w-1.c.amulya.internal, executor 2): java.io.IOException: Cannot run program "/opt/conda/bin/python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 33 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2808)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2805)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot run program "/opt/conda/bin/python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 33 more

18/04/26 07:05:11 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 14.0 (TID 656, spark-w-0.c.amulya.internal, executor 4): TaskKilled (stage cancelled)
18/04/26 07:05:11 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@3af4a719{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/04/26 07:05:11 WARN org.apache.spark.rpc.netty.Dispatcher: Message RemoteProcessDisconnected(10.138.0.6:39486) dropped. Could not find OutputCommitCoordinator.

The spark is running on a google cloud dataproc cluster. Thanks.

EDIT 1:

following is the temp variable with its values:

temp = ['javascript', 'html', 'css', 'jquery', 'ajax', 'ruby on rails', 'agile', 'linux']

Upvotes: 0

Views: 2035

Answers (1)

Karthik Palaniappan
Karthik Palaniappan

Reputation: 1383

Init actions need to be run on all nodes of the cluster, not just the master. The driver started successfully because you had run the init action on the master, but then the job failed on executors because they did not have Conda installed.

In general, you should not run initialization actions manually. E.g. if you later add nodes to the cluster, you will need to run the script on the new nodes as well. However, if you specify initialization actions when creating a cluster, Dataproc will handle that for you.

You can specify init actions through the web console:

enter image description here

enter image description here

Note that if you want to specify metadata (flags) to the init actions, such as conda packages to install, you will need to use gcloud. The easiest way to do that is to start from "Equivalent command line" at the bottom of the create cluster page.

In general, I would suggest deleting and recreating your cluster if you want to want to add init actions or add flags. This is especially easy if your input data resides outside the cluster (e.g. Cloud Storage).

Upvotes: 1

Related Questions