Reputation: 57
I have list of files in hdfs directory and I would like to iterate over files in pyspark from hdfs directory and store each file in a variable and use that variable for further processing. I a getting an error below..
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonUtils.toSeq. Trace:
py4j.Py4JException: Method toSeq([class org.apache.hadoop.fs.Path]) does not exist
InputDir = "/Data/Ready/ARRAY_COUNTERS"
# input hdfs directory.
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(InputDir)
for f in fs.get(conf).listStatus(path):
Filename = f.getPath()
df = spark.read.csv(Filename,header=True)
#I am getting above error in while reading this file.
Upvotes: 3
Views: 4478
Reputation: 13377
About this 2 lines:
Filename = f.getPath()
df = spark.read.csv(Filename,header=True)
getPath() is not a string. Additionally - f
could be also directory, so to make sure you are not trying to load directory, you can add validation on f.isFile()
:
if(f.isFile()):
Filename = f.getPath()
df = spark.read.csv(str(Filename),header=True)
Now alternative, which worked for me was:
if(f.isFile()):
Filename = f.getPath()
df = sc.textFile(str(Filename), 500).map(lambda x: x.split(", ")) #or any other spearator, returns RDD
headers=df.first() # to infer schema - you can then convert it to pyspark dataframe with specific column types
Upvotes: 3