Fragan
Fragan

Reputation: 892

PySpark write parquet from directory partitionned by file name

In HDFS, i have a folder /landing/my_data/flow_name/ within this folder i have bunch of .csv files :

I want to create a parquet file partitioned by the files name using the /landing/my_data/children_flow/ directory and put it into /staging/my_data/children_flow/.

I want /staging/my_data/children_flow/ to look like this :

I thought about reading every file from /landing/my_data/children_flow into a DataFrame, then adding the file name as a column and finally creating my parquet file which will be partitioned by the file_name column.

children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.read.csv(os.path.join("/landing/my_data/children_flow/", children_files[0]))
df = df.withColumn("file_name", lit(children_files[0].replace(".csv", "")))
children_files.pop(0)
for one_file in children_files :
    df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
    df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
    df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")

But im not sure that is the best solution...Anyone can help me up with that ?

Upvotes: 0

Views: 1858

Answers (1)

Fragan
Fragan

Reputation: 892

I did this :

children_files = hdfs.list("/landing/my_data/children_flow/")
df = spark.createDataFrame(
            spark.sparkContext.emptyRDD(),
            dfSchema
)
df = df.withColumn("file_name", lit(None))
for one_file in children_files :
    df2 = spark.read.csv(os.path.join("/landing/my_data/children_flow/", one_file))
    df2 = df2.withColumn("file_name", lit(one_file.replace(".csv", "")))
    df = df.union(df2)
df.write.partitionBy("file_name").parquet("/staging/my_data/children_flow/")

I feel like its better to use a emptyRDD().

Upvotes: 0

Related Questions