Rajesh Meher
Rajesh Meher

Reputation: 57

iterate over files in pyspark from hdfs directory

I have list of files in hdfs directory and I would like to iterate over files in pyspark from hdfs directory and store each file in a variable and use that variable for further processing. I a getting an error below..

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonUtils.toSeq. Trace: 
py4j.Py4JException: Method toSeq([class org.apache.hadoop.fs.Path]) does not exist

InputDir = "/Data/Ready/ARRAY_COUNTERS" # input hdfs directory.

hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(InputDir)

for f in fs.get(conf).listStatus(path):
    Filename =  f.getPath()

df = spark.read.csv(Filename,header=True)
#I am getting above error in while reading this file.

Upvotes: 3

Views: 4478

Answers (1)

Georgina Skibinski
Georgina Skibinski

Reputation: 13377

About this 2 lines:

    Filename =  f.getPath()

df = spark.read.csv(Filename,header=True)

getPath() is not a string. Additionally - f could be also directory, so to make sure you are not trying to load directory, you can add validation on f.isFile():

if(f.isFile()):
    Filename =  f.getPath()
    df = spark.read.csv(str(Filename),header=True)

Now alternative, which worked for me was:

if(f.isFile()):
    Filename =  f.getPath()
    df = sc.textFile(str(Filename), 500).map(lambda x: x.split(", ")) #or any other spearator, returns RDD
    headers=df.first() # to infer schema - you can then convert it to pyspark dataframe with specific column types

Upvotes: 3

Related Questions