ntj
ntj

Reputation: 335

Pyspark dataframe split alphabetically and write to S3

I am trying to split a huge XML file into small XML files using pyspark. I need the data to be written into buckets alphabetically.

Suppose if the name starts with a then it would be written to an s3 bucket s3://bucket_name/a. If there is no name that starts with b it should still create a folder with name b in the same bucket, that is s3://bucket_name/b

So far the code I have is

charater = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"]

for c in charater:
    df1 = df.filter(lower(trim(col('name')).substr(1, 1)) == c)
    df1.write\
     .format("com.databricks.spark.xml")\
     .option("maxRecordsPerFile", 800)\
     .option("rootTag","source")\
     .option("rowTag", "employees")\
     .mode("overwrite")\
     .save(f's3://split-files/{c}')

But this code takes a very long time to finish. Is there a better way to do this using data frames?

Thanks in advance.

Upvotes: 2

Views: 1061

Answers (2)

Rafa
Rafa

Reputation: 527

The fastest way I see is to use write with partition by clause and process the whole data at a single go, the only draw back i is the folder name will be s3://bucket_name/char_name=a instead of s3://bucket_name/a which you are expecting , you could rename the bucket name if you really want to stick to the folder name

df = df.withColumn('char_name', df['name'].substr(1, 1))
df1.repartition("char_name").write\
     .format("com.databricks.spark.xml")\
     .option("maxRecordsPerFile", 800)\
     .option("rootTag","source")\
     .option("rowTag", "employees")\
     .mode("overwrite")\
     .save(f's3://split-files/')

There is no need to do a for loop.

if there is absolute need of folder to be present , You can do a left outer join to the alphabet list and create all records. I added a extra column but you can drop of rename as per ur need

from pyspark.sql.types import StringType
alphabet_string = string.ascii_lowercase
alphabet_list = list(alphabet_string)
df_list = spark.createDataFrame(alphabet_list, StringType())
df = df.withColumn('char_name', df['name'].substr(1, 1))
df.createOrReplace("data")
df_list.createOrReplace("misssingalphabet")
df_final=spark.sql("select A.value as all_char_name,B.* from misssingalphabet A left outer join data B on A.value = B.char_name")
df_final.repartition("all_char_name").write\
     .format("com.databricks.spark.xml")\
     .option("maxRecordsPerFile", 800)\
     .option("rootTag","source")\
     .option("rowTag", "employees")\
     .mode("overwrite")\
     .save(f's3://split-files/')

Upvotes: 1

User12345
User12345

Reputation: 5480

To reduce the time use df.persist() before the for loop as suggested by @Steven

For the small files issue you can use coalesce but this is expensive operation.

for c in charater:
    df1 = df.filter(lower(trim(col('name')).substr(1, 1)) == c)
    df1.coalesce(1).write\
     .format("com.databricks.spark.xml")\
     .option("maxRecordsPerFile", 800)\
     .option("rootTag","source")\
     .option("rowTag", "employees")\
     .mode("overwrite")\
     .save(f's3://split-files/{c}')

This will create only one file in each bucket. you can change the number of files you want by specifying to coalesce function

Upvotes: 1

Related Questions