Reputation: 419
I am using following code to read multiple csv files and and converting them to pandas df then concat it as a single pandas df. Finally converting again into spark DataFrame. I want to skip conversion to pandas df part and simply want to have spark DataFrame.
File Paths
abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=1/*.csv
abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=2/*.csv
......
Code
list = []
for month in range(1,3,1):
for day in range(1,31,1):
for hour in range(0,24,1):
file_location = "//xxxxxx/abc/year=2021/month="+str(month)+"/dayofmonth="+str(day)+"/hour="+str(hour)+"/*.csv"
try :
spark_df = spark.read.format("csv").option("header", "true").load(file_location)
pandas_df = spark_df.toPandas()
list.append(pandas_df)
except AnalysisException as e:
print(e)
final_pandas_df = pd.concat(list)
df = spark.createDataFrame(final_pandas_df)
Upvotes: 1
Views: 5720
Reputation: 42332
You can load all the files and apply a filter on the partitioning columns:
df = spark.read.format("csv").option("header", "true").load("abfss://xxxxxx/abc/").filter(
'year = 2021 and month between 1 and 2 and day between 1 and 30 and hour between 0 and 23'
)
Upvotes: 3