Reputation: 349
I am trying to convert a spark data frame to pandas and there is a error I am encountering with:
databricks/spark/python/pyspark/sql/pandas/conversion.py:145: UserWarning: toPandas attempted
Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has
reached the error below and can not continue. Note that
'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in
the middle of computation.
An error occurred while calling o466.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:428)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:107)
at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage
43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 97) (ip-10-172-188-
62.us-west-2.compute.internal executor driver): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5401/964020024.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5281/859288619.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2828)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2775)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2769)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2769)
at org.apache.spark.scheduler.DAGScheduler
at org.apache.spark.scheduler.DAGScheduler
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1305)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2977)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2965)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1067)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2476)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2459)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2571)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3761)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3765)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3731)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3825)
at org.apache.spark.sql.execution.SQLExecution$:130)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3823)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3731)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3730)
at org.apache.spark.security.SocketAuthServer$
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1605)
at org.apache.spark.security.SocketAuthServer$
at org.apache.spark.security.SocketAuthServer$.
at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:117)
at org.apache.spark.security.SocketAuthServer$$a
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:70)
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$19(Executor.scala:859)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5401/964020024.apply(Unknown Source)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:859)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$5281/859288619.apply$mcV$sp(Unknown Source)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
warnings.warn(msg)
Below is the how the data frame/code looks like- (Dataset is formed by different datasets and joining)
Upvotes: 1
Views: 12072
Reputation: 41
One trick that works much better for moving data from pyspark dataframe to pandas dataframe is to avoid the collect via jvm altogether. It is much faster to write to disc or cloud storage and read back with pandas.read_parquet, this will never crash and will minimize memory consumption and time. This is very fast even if you have 100s of million of rows in your data.
Upvotes: 0
Reputation: 2169
There are some workarounds, but they don't help if the dataset is just too big for your master node.
You could use
df.coalesce(1).write.save(..., format='parquet')
pdf = pd.read_parquet(...)
Or
pdf = df.coalesce(1).toPandas()
Coalesce(1) creates one partition, hence less shuffling during the creation of the pandas dataframe and the need of less memory.
Upvotes: 1
Reputation: 2936
In the traceback it says:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage
43.0 failed 1 times, most recent failure: Lost task 0.0 in stage 43.0 (TID 97) (ip-10-172-188-
62.us-west-2.compute.internal executor driver): java.lang.OutOfMemoryError: Java heap space
Note: java.lang.OutOfMemoryError: Java heap space
You ran out of memory. That's causing the issue.
This is expected, since it uses the single machine to gather all the data, as noted in the documentation:
Note that converting pandas-on-Spark DataFrame to pandas requires to collect all the data into the client machine; therefore, if possible, it is recommended to use pandas API on Spark or PySpark APIs instead.
To learn more about pandas-on-Spark DataFrame, pandas DataFrame and conversion between pyspark Dataframe please see: From/to pandas and PySpark DataFrames.
Upvotes: 2