Reputation: 35
Hi I am facing a problem related to pyspark, I use df.show()
it still give me a result but when I use some function like count()
, groupby()
v..v it show me error, I think the reason is that 'df' is too large.
Please help me solve it. Thanks!
import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("box") \
.config("spark.driver.memory", "25g",conf) \
.getOrCreate()
basepath = '/mnt/raw_data/play/log_stream/playstats_v100/topic=play_map_play_vod'
path = ['/mnt/raw_data/play/log_stream/playstats_v100/topic=play_map_play_vod/date=2021-01*']
df = spark.read.option("basePath",basepath).parquet(*path)
df.count()
the error:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent
call last) <ipython-input-321-3c9a60fd698f> in <module>()
----> 1 df.count() ~/anaconda3/lib/python3.6/site-packages/pyspark/sql/dataframe.py in
count(self)
453 2
454 """
--> 455 return int(self._jdf.count())
456
457 @ignore_unicode_prefix ~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in
__call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256
return_value
= get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args:
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in
deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString() ~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError( Py4JJavaError: An error occurred while calling o2635.count. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 312 in stage 1079.0 failed 1 times, most recent failure: Lost
task 312.0 in stage 1079.0 (TID 54105, localhost, executor driver):
org.apache.hadoop.fs.FSError: java.io.IOException: No such device or
address at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.readFully(DataInputStream.java:195) at
java.io.DataInputStream.readFully(DataInputStream.java:169) at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:151)
at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) at
org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:401)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:106)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:404)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:345)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.io.IOException: No such device or address at
java.io.FileInputStream.readBytes(Native Method) at
java.io.FileInputStream.read(FileInputStream.java:255) at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
... 32 more Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at
org.apache.spark.rdd.RDD.collect(RDD.scala:944) at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
at
org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2775)
at
org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2774)
at
org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) at
org.apache.spark.sql.Dataset.count(Dataset.scala:2774) at
sun.reflect.GeneratedMethodAccessor369.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:238) at
java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.hadoop.fs.FSError: java.io.IOException: No such device or
address at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:163)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.readFully(DataInputStream.java:195) at
java.io.DataInputStream.readFully(DataInputStream.java:169) at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:151)
at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) at
org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:401)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:106)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:404)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:345)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
Source) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more Caused by: java.io.IOException: No such device or
address at java.io.FileInputStream.readBytes(Native Method) at
java.io.FileInputStream.read(FileInputStream.java:255) at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:156)
... 32 more
Upvotes: 0
Views: 10714
Reputation: 6082
You're using a wildcard in your path '/mnt/raw_data/play/log_stream/playstats_v100/topic=play_map_play_vod/date=2021-01*'
, so probably one of them is corrupted. show
doesn't throw any error that's mean the path of the records is shown basically correct, but not all of them. You can debug which one is causing the error by checking paths one by one (or few at the time)
Upvotes: 1