Reputation: 892
In HDFS, i have a folder /landing/my_data/flow_name/
within this folder i have bunch of .csv
files :
I want to create a parquet file partitioned by the files name using the /landing/my_data/children_flow/
directory and put it into /staging/my_data/children_flow/
.
I want /staging/my_data/children_flow/
to look like this :
I thought about reading every file from /landing/my_data/children_flow
into a DataFrame, then adding the file name as a column and finally creating my parquet file which will be partitioned by the file_name
column.
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.read.csv(os.path.join("/landing/my_data/children_flow/", children_files[0]))
df = df.withColumn("file_name", lit(children_files[0].replace(".csv", "")))
children_files.pop(0)
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
But im not sure that is the best solution...Anyone can help me up with that ?
Upvotes: 0
Views: 1858
Reputation: 892
I did this :
children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
spark.sparkContext.emptyRDD(),
dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")
I feel like its better to use a emptyRDD()
.
Upvotes: 0