Reputation: 77
I have run into an issue where my pyspark job fails intermittently. When I read all the parquet files from a directory using code below, it throws an exception File "C:\Python27\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 717, in read_int raise EOFError EOFError
inputPathWithData = []
if (path.exists(input)):
inputPathWithData.append(input + "\\*.parquet")
if (len(inputPathWithData) > 0):
parquetFile = spark.read.parquet(*inputPathWithData)
parquetFile.createOrReplaceTempView("imp_parquetFile")
imp_df = spark.sql("SELECT * FROM imp_parquetFile as imp")
imp_df.write.json(output_path +"Single", compression="gzip")
However if I read the file individually it all works as expected
i = 0
for input_file in os.listdir(input) :
i+=1
parquetFile = spark.read.parquet(input + input_file)
parquetFile.createOrReplaceTempView("imp_parquetFile")
try:
imp_df = spark.sql("SELECT * FROM imp_parquetFile")
imp_df.write.json(output_path + '_ '+ str(i), compression="gzip")
except:
print("Issue with Filename: {0}".format(input_file))
The code to initialize spark is
spark = SparkSession \
.builder \
.appName("scratch_test") \
.config("spark.debug.maxToStringFields", "100") \
.config("spark.executor.memory", "10G") \
.getOrCreate()
Upvotes: 0
Views: 801
Reputation: 770
You don't need to specify each and every parquet file. Just read the whole folder.
df=spark.read.parquet("/folder/")
Upvotes: 1