J. Miller
J. Miller

Reputation: 757

Spark 'saveAsTextFile' to S3: Can't control number of files with 'coalesce'

Using Python 3 with PySpark and Spark 1.6.0. I've read that the number of files created by saveAsTextFile() is equal to the number of RDD partitions. However, I'm specifically coalescing the RDD to 16 partitions but only 1 file is getting written to S3 (part-00000.gz). What am I doing wrong?

Here's the code I'm using:

conf = SparkConf()
sc = SparkContext(conf=conf)
sc.setLogLevel('WARN')
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', AWS_SECRET_KEY)
sqlContext = HiveContext(sc)

tbl = sqlContext.table(TABLE)
tbl.limit(1000000).toJSON().coalesce(16).saveAsTextFile(S3A_BUCKET_URL, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

The original TABLE is Parquet stored in about 11,000 files (which I assume is equal to the Spark partitions?). When I don't use limit() and coalesce() on the whole table, it does attempt to create thousands of small files on S3, which takes a long time and just isn't necessary when I'd prefer fewer larger files.

Upvotes: 0

Views: 627

Answers (1)

zero323
zero323

Reputation: 330093

This is because you use limit. As for now (there is an ongoing discussion on the developers list so it may change in the future) limit repartitions all data to a single partition. Since coalesce can only decrease number of partitions it has no effect whatsoever.

For performance reasons it should be better to sample and coalesce. For example:

from operator import truediv

df.cache()
n = ... # Number of records to take
m = df.count()

df.sample(withReplacement=False, fraction=truediv(n / m))

but if you want to use exact limit you'll have to repartition instead of coalesce.

Upvotes: 1

Related Questions