Reputation: 178
I am trying to do some computation using UDFs. But after the computation when i try to convert the pyspark dataframe to pandas it gives me
org.apache.spark.SparkException: Exception thrown in awaitResult:
I will put down the reproducible code.
import pandas as pd
import numpy as np
import time
n = 10000
sample_df = pd.DataFrame(np.random.rand(n,n))
sample_df.columns = sample_df.columns.astype(str)
sample_df.index = sample_df.index.astype(str)
sample_df.loc['start'] = np.random.rand(n)
sample_df.loc['null'] = np.random.rand(n)
sample_df.loc['conv'] = np.random.rand(n)
sample_df["start"] = 0.0
sample_df["null"] = np.random.rand(sample_df.shape[0])
sample_df["conv"] = np.random.rand(sample_df.shape[0])
sample_df.index.name = 'from'
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
channels = [channel for channel in sample_df.columns if channel not in ['start', 'null', 'conv']]
channels_df = spark.createDataFrame(channels, StringType()).toDF(*['channel'])
from pyspark.sql.functions import udf
@udf("float")
def removal_effects_udf(channel):
global sample_df
conversion_rate=0.0313
removal_df = sample_df.drop(channel, axis=1).drop(channel, axis=0)
row_sum = pd.DataFrame(float(1) - removal_df.sum(axis=1), columns = ["value"])
null_pct = row_sum[row_sum['value']!=0].reset_index()
null_pct.set_index('from', inplace=True)
removal_df['null'] = removal_df.index.to_series().map(null_pct['value']).fillna(removal_df['null'])
removal_df.loc['null']['null'] = 1.0
removal_to_conv = removal_df[['null', 'conv']].drop(['null', 'conv'], axis=0)
removal_to_non_conv = removal_df.drop(['null', 'conv'], axis=1).drop(['null', 'conv'], axis=0)
removal_inv_diff = np.linalg.inv(np.identity(len(removal_to_non_conv.columns)) - np.asarray(removal_to_non_conv))
removal_dot_prod = np.dot(removal_inv_diff, np.asarray(removal_to_conv))
removal_cvr = pd.DataFrame(removal_dot_prod, index=removal_to_conv.index)[[1]].loc['start'].values[0]
removal_effect = 1 - removal_cvr / conversion_rate
return float(removal_effect)
channels_df = channels_df.withColumn("removal_effect", removal_effects_udf(F.col("channel"))).toPandas()
channels_df_pandas = channels_df.toPandas()
Having done this I get this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-1959014423699154> in <module>
----> 1 channels_df = channels_df.withColumn("removal_effect", removal_effects_udf(F.col("channel"))).toPandas()
/databricks/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
106 # Rename columns to avoid duplicated column names.
107 tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
--> 108 batches = self.toDF(*tmp_column_names)._collect_as_arrow()
109 if len(batches) > 0:
110 table = pyarrow.Table.from_batches(batches)
/databricks/spark/python/pyspark/sql/pandas/conversion.py in _collect_as_arrow(self)
244 finally:
245 # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 246 jsocket_auth_server.getResult()
247
248 # Separate RecordBatches from batch order indices in results
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o32146.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 160 in stage 1221.0 failed 4 times, most recent failure: Lost task 160.3 in stage 1221.0 (TID 161215, 10.0.1.18, executor 582): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:618)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:607)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:71)
... 38 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:115)
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:108)
at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$1(SocketAuthServer.scala:62)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:62)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:618)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:607)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:71)
... 38 more
How do I solve this issue?
I am using databricks, with cluster having 488 cores | 1.75TB | Spark 3.0.0
EDIT :
To the solution provided by user @wwnde i still get an error which is:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-1959014423698939> in <module>
5
6 spark.conf.set("spark.sql.execution.arrow.enabled", "true")
----> 7 channels_df_pandas = channels_df.select("*").toPandas()
/databricks/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
106 # Rename columns to avoid duplicated column names.
107 tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
--> 108 batches = self.toDF(*tmp_column_names)._collect_as_arrow()
109 if len(batches) > 0:
110 table = pyarrow.Table.from_batches(batches)
/databricks/spark/python/pyspark/sql/pandas/conversion.py in _collect_as_arrow(self)
244 finally:
245 # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 246 jsocket_auth_server.getResult()
247
248 # Separate RecordBatches from batch order indices in results
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o32697.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 1223.0 failed 4 times, most recent failure: Lost task 7.3 in stage 1223.0 (TID 161666, 10.0.1.26, executor 597): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 644, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/databricks/spark/python/pyspark/worker.py", line 463, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/databricks/spark/python/pyspark/worker.py", line 254, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/databricks/spark/python/pyspark/worker.py", line 76, in read_command
command = serializer.loads(command.value)
File "/databricks/spark/python/pyspark/broadcast.py", line 154, in value
self._value = self.load_from_path(self._path)
File "/databricks/spark/python/pyspark/broadcast.py", line 131, in load_from_path
return self.load(f)
File "/databricks/spark/python/pyspark/broadcast.py", line 137, in load
return pickle.load(file)
OSError: [Errno 12] Cannot allocate memory
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:585)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at scala.collection.AbstractIterator.to(Iterator.scala:1429)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146)
at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 1
Views: 10456
Reputation: 87174
The main problem with your code is that you're using toPandas
function that is effectively brings all of your data to the driver node - the total amount of memory and cores in cluster is irrelevant here - the driver node size is main bottleneck (of course you can increase the driver node size). I also see that you're referring the global variable from the UDF - theoretically it should be broadcasted, but it's still bad practice.
To really fix the problem you need to rework your approach to make your code completely distributed:
toPandas
- it's better to write results somewhere, and access them other way - it looks like that you have just too much data.Upvotes: 1
Reputation: 26676
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
channels_df_pandas = channels_df.select("*").toPandas()
Upvotes: 2