Reputation: 812
I have a kaggle dataset that I want to stream one record at a time based on the Start_Time
column and the End_Time
column.
I created a temporary table with both columns unioned, convert them to UNIX, and scale them to fit in the RUNTIME
constant. I called this column Stream_Time
. This means that every ID appears twice; once for start and once for end.
I then join the temporary table and the main dataframe on the ID column so that when I iterate through the Stream_Time
column I can send all the data, instead of doing a subquery for the ID from the main dataframe.
The code:
import findspark
findspark.init()
RUNTIME = 600 #Constant
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, split, broadcast
from datetime import datetime
spark = SparkSession \
.builder \
.master('local[*]') \
.appName("USA Accidents Analysis with Pyspark") \
.getOrCreate()
def toUnix(date):
# This is to account for milliseconds, we don't care about such precision
date = date.split('.')[0]
date = datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
return datetime.timestamp(date)
udfToUnix = udf(toUnix)
main_df = spark.read.csv('./US_Accidents_Dec21_updated.csv', header = True)
main_df = main_df.withColumn('Start_Time_Unix', udfToUnix(main_df['Start_Time']) \
.cast(FloatType()))
main_df = main_df.withColumn('End_Time_Unix', udfToUnix(main_df['End_Time'])
.cast(FloatType()))
# Clean the ID column
main_df = main_df.withColumn('ID', split(main_df['ID'],'-').getItem(1).cast(IntegerType()))
temp_df = main_df.select('ID','Start_Time_Unix') \
.union(main_df.select('ID','End_Time_Unix')) \
.orderBy('Start_Time_Unix') \
.orderBy('ID')
temp_df = temp_df.withColumn('Time_Unix',temp_df['Start_Time_Unix'])
earliest = temp_df.agg({'Time_Unix':"min"}).collect()
earliest = earliest[0][0]
latest = temp_df.agg({"Time_Unix":"max"}).collect()
latest = latest[0][0]
def scale(unix):
return ((unix - earliest) / (latest - earliest)) * RUNTIME
udfScaling = udf(scale,FloatType())
temp_df = temp_df.withColumn('Stream_Time',udfScaling(temp_df['Time_Unix']))
temp_df = temp_df.withColumnRenamed('ID','temp_id')
to_delete = ('Start_Time_Unix','End_Time_Unix','Time_Unix',"temp_id")
stream_df = temp_df.join(broadcast(main_df), temp_df.temp_id == main_df.ID) \
.drop(*to_delete).orderBy('Stream_Time')
stream_df.write.parquet('./Stream3.parquet')
It all goes well till the last line, it generates this huge error
22/07/25 17:15:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 10:> (0 + 12) / 12]
[54.853s][warning][gc,alloc] Executor task launch worker for task 2.0 in stage 10.0 (TID 78): Retried waiting for GCLocker too often allocating 13796354 words
22/07/25 17:15:27 ERROR Executor: Exception in task 2.0 in stage 10.0 (TID 78)
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3537)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:100)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:130)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$2634/0x0000000801956820.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.RDD$$Lambda$2631/0x0000000801947ba0.apply(Unknown Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2593/0x0000000801931968.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
22/07/25 17:15:28 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 2.0 in stage 10.0 (TID 78),5,main]
java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3537)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:100)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:130)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$2634/0x0000000801956820.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.RDD$$Lambda$2631/0x0000000801947ba0.apply(Unknown Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2593/0x0000000801931968.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 2.0 in stage 10.0 (TID 78) (Fedora executor driver): java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3537)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:100)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:130)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$2634/0x0000000801956820.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.RDD$$Lambda$2631/0x0000000801947ba0.apply(Unknown Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2593/0x0000000801931968.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
22/07/25 17:15:28 ERROR TaskSetManager: Task 2 in stage 10.0 failed 1 times; aborting job
22/07/25 17:15:28 WARN TaskSetManager: Lost task 5.0 in stage 10.0 (TID 81) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 4.0 in stage 10.0 (TID 80) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 ERROR FileFormatWriter: Aborting job ba6edbb7-65d8-4c72-ae0a-ce4d4eb21b06.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 1 times, most recent failure: Lost task 2.0 in stage 10.0 (TID 78) (Fedora executor driver): java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3537)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:100)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:130)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$2634/0x0000000801956820.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.RDD$$Lambda$2631/0x0000000801947ba0.apply(Unknown Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2593/0x0000000801931968.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:431)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:137)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3537)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:100)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:130)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
at org.apache.spark.sql.execution.SparkPlan$$Lambda$2634/0x0000000801956820.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.RDD$$Lambda$2631/0x0000000801947ba0.apply(Unknown Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2593/0x0000000801931968.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
... 3 more
22/07/25 17:15:28 WARN TaskSetManager: Lost task 8.0 in stage 10.0 (TID 84) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 11.0 in stage 10.0 (TID 87) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 7.0 in stage 10.0 (TID 83) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 10.0 in stage 10.0 (TID 86) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 3.0 in stage 10.0 (TID 79) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 77) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 9.0 in stage 10.0 (TID 85) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 6.0 in stage 10.0 (TID 82) (Fedora executor driver): TaskKilled (Stage cancelled)
22/07/25 17:15:28 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 76) (Fedora executor driver): TaskKilled (Stage cancelled)
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 47890)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/mohamed/.local/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
File "/usr/lib64/python3.10/socketserver.py", line 316, in _handle_request_noblock
self.process_request(request, client_address)
File "/usr/lib64/python3.10/socketserver.py", line 347, in process_request
self.finish_request(request, client_address)
File "/usr/lib64/python3.10/socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib64/python3.10/socketserver.py", line 747, in __init__
self.handle()
File "/home/mohamed/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
poll(accum_updates)
File "/home/mohamed/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 253, in poll
if func():
File "/home/mohamed/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 257, in accum_updates
num_updates = read_int(self.rfile)
File "/home/mohamed/.local/lib/python3.10/site-packages/pyspark/serializers.py", line 595, in read_int
raise EOFError
EOFError
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
File ~/.local/lib/python3.10/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
File ~/.local/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
<class 'str'>: (<class 'ConnectionRefusedError'>, ConnectionRefusedError(111, 'Connection refused'))
During handling of the above exception, another exception occurred:
Py4JError Traceback (most recent call last)
Input In [1], in <cell line: 66>()
61 stream_df = temp_df.join(broadcast(main_df), temp_df.temp_id == main_df.ID) \
62 .drop(*to_delete).orderBy('Stream_Time')
64 stream_df.printSchema()
---> 66 stream_df.write.parquet('./Stream3.parquet')
File ~/.local/lib/python3.10/site-packages/pyspark/sql/readwriter.py:1140, in DataFrameWriter.parquet(self, path, mode, partitionBy, compression)
1138 self.partitionBy(partitionBy)
1139 self._set_opts(compression=compression)
-> 1140 self._jwrite.parquet(path)
File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File ~/.local/lib/python3.10/site-packages/pyspark/sql/utils.py:192, in capture_sql_exception.<locals>.deco(*a, **kw)
190 return f(*a, **kw)
191 except Py4JJavaError as e:
--> 192 converted = convert_exception(e.java_exception)
193 if not isinstance(converted, UnknownException):
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
196 raise converted from None
File ~/.local/lib/python3.10/site-packages/pyspark/sql/utils.py:170, in convert_exception(e)
167 c: Py4JJavaError = e.getCause()
168 stacktrace: str = jvm.org.apache.spark.util.Utils.exceptionString(e)
169 if c is not None and (
--> 170 is_instance_of(gw, c, "org.apache.spark.api.python.PythonException")
171 # To make sure this only catches Python UDFs.
172 and any(
173 map(
174 lambda v: "org.apache.spark.sql.execution.python" in v.toString(), c.getStackTrace()
175 )
176 )
177 ):
178 msg = (
179 "\n An exception was thrown from the Python worker. "
180 "Please see the stack trace below.\n%s" % c.getMessage()
181 )
182 return PythonException(msg, stacktrace)
File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:464, in is_instance_of(gateway, java_object, java_class)
460 else:
461 raise Py4JError(
462 "java_class must be a string, a JavaClass, or a JavaObject")
--> 464 return gateway.jvm.py4j.reflection.TypeUtil.isInstanceOf(
465 param, java_object)
File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:1722, in JVMView.__getattr__(self, name)
1719 _, error_message = get_error_message(answer)
1720 message = compute_exception_message(
1721 "{0} does not exist in the JVM".format(name), error_message)
-> 1722 raise Py4JError(message)
Py4JError: py4j does not exist in the JVM
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/home/mohamed/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
I didn't install spark from apache, I just used pip install pyspark
.
I tried alot of solutions here on SO, including repartitioning, coalesce, setting .config('spark.driver.memory','10g')
in the SparkSession doesn't work, and setting values higher than 10g forcibly closes jupyter/terminal.
It doesn't need to be in parquet, I just need to save it so that I can read it from another script.
Upvotes: 1
Views: 3412
Reputation: 2033
You post covers several questions and optimization:
It all goes well till the last line, it generates this huge error
: The reason why you only get error message when you execute the last line of code is because the spark computation mechanism. All RDD in spark support two operations: transformations
, which create a new dataset from an existing one, and actions
, which return a value to the driver program after running a computation on the dataset. All transformations
in Spark are lazy and only be executed when action is required. You can check RDD operation for more detail:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations. Based on the code that you provided, the only action
that in your code is the collect
and write.parquet(...)
. Therefore, it is possible that the error code occurs when last line of code is executed. In fact, it doesn't mean that the write.parquet(...)
operation cause the OOM in Java heap space, you have to check the spark log to see which transformation cause this error.java.lang.OutOfMemoryError: Java heap space
doesn't mean that there is not enough memory for the driver, lack of resources in executors will cause this error too. Please check if your executor resource is enough or not and also the number of partitions that you use. Also, noticed that you're loading the data frame by csv
but not parquet
, please check the number of record in each partition by df.groupBy(func.spark_partition_id()).count()
to see if there is too much of data in a single partition? These two post show a good discussion on consider the partitioning and OOM debugging issue: Spark java.lang.OutOfMemoryError : Java Heap space and Spark java.lang.OutOfMemoryError: Java heap spaceUpvotes: 1