Reputation: 335
I am trying to split a huge XML file into small XML files using pyspark. I need the data to be written into buckets alphabetically.
Suppose if the name starts with a
then it would be written to an s3 bucket s3://bucket_name/a
. If there is no name that starts with b
it should still create a folder with name b in the same bucket, that is s3://bucket_name/b
So far the code I have is
charater = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"]
for c in charater:
df1 = df.filter(lower(trim(col('name')).substr(1, 1)) == c)
df1.write\
.format("com.databricks.spark.xml")\
.option("maxRecordsPerFile", 800)\
.option("rootTag","source")\
.option("rowTag", "employees")\
.mode("overwrite")\
.save(f's3://split-files/{c}')
But this code takes a very long time to finish. Is there a better way to do this using data frames?
Thanks in advance.
Upvotes: 2
Views: 1061
Reputation: 527
The fastest way I see is to use write with partition by clause and process the whole data at a single go, the only draw back i is the folder name will be s3://bucket_name/char_name=a instead of s3://bucket_name/a which you are expecting , you could rename the bucket name if you really want to stick to the folder name
df = df.withColumn('char_name', df['name'].substr(1, 1))
df1.repartition("char_name").write\
.format("com.databricks.spark.xml")\
.option("maxRecordsPerFile", 800)\
.option("rootTag","source")\
.option("rowTag", "employees")\
.mode("overwrite")\
.save(f's3://split-files/')
There is no need to do a for loop.
if there is absolute need of folder to be present , You can do a left outer join to the alphabet list and create all records. I added a extra column but you can drop of rename as per ur need
from pyspark.sql.types import StringType
alphabet_string = string.ascii_lowercase
alphabet_list = list(alphabet_string)
df_list = spark.createDataFrame(alphabet_list, StringType())
df = df.withColumn('char_name', df['name'].substr(1, 1))
df.createOrReplace("data")
df_list.createOrReplace("misssingalphabet")
df_final=spark.sql("select A.value as all_char_name,B.* from misssingalphabet A left outer join data B on A.value = B.char_name")
df_final.repartition("all_char_name").write\
.format("com.databricks.spark.xml")\
.option("maxRecordsPerFile", 800)\
.option("rootTag","source")\
.option("rowTag", "employees")\
.mode("overwrite")\
.save(f's3://split-files/')
Upvotes: 1
Reputation: 5480
To reduce the time use df.persist()
before the for loop
as suggested by @Steven
For the small files issue you can use coalesce but this is expensive operation.
for c in charater:
df1 = df.filter(lower(trim(col('name')).substr(1, 1)) == c)
df1.coalesce(1).write\
.format("com.databricks.spark.xml")\
.option("maxRecordsPerFile", 800)\
.option("rootTag","source")\
.option("rowTag", "employees")\
.mode("overwrite")\
.save(f's3://split-files/{c}')
This will create only one file in each bucket. you can change the number of files you want by specifying to coalesce
function
Upvotes: 1