Reputation: 173
i am working with a heavily nested non-splittable json format input dataset housed in S3. The files can vary a lot in their sizes - minimum is 10kb while other is 300 MB.
When reading the file using the below code, and just doing a simple repartition to desired number of partitions leads to straggling tasks - most tasks finish within seconds but one would last for couple hours and then runs into memory issues (heartbeat missing/ heap space etc.) I repartition in an attempt to randomize the partition to file mapping since spark may be reading files in sequence and files within same directory tend to have same nature -all large/all small etc.
df = spark.read.json('s3://my/parent/directory')
df.repartition(396)
# Settings (few):
default parallelism = 396
total number of cores = 400
/parent1
/file1
/file2
.
.
/file1000
/parent2/
/file1
hashcode=FEFRE#$#$$#FE/parent1/file1
hashcode=#$#$#Cdvfvf@#/parent1/file1
But it didnt have any effect.
When I check the number of files (each file becomes a row in dataframe due to its nested - unsplittable nature) assigned to each partition - I see number of files assigned to be between 2 to 32. Is it because spark picks up the files in partitions based on spark.sql.files.maxPartitionBytes
- and probably its assigning only two files where the file size is huge , and much more files to single partition when the filesize is less?
Any recommendations to make the job work properly, and distribute the tasks uniformly - given size of input files is something that can not be changed due to nature of input files.
This is the entire code.
Results: The job gets stuck in Running - even though in worker logs I see the task finished. The driver log has error 'can not increase buffer size' - I dont know what is causing the 2GB buffer issue, since I am not issuing any 'collect' or similar statement.
Configuration for this job is like below to ensure the executor has huge memory per task.
Driver/Executor Cores: 2.
Driver/ Executor memory: 198 GB.
# sample s3 source path:
s3://masked-bucket-urcx-qaxd-dccf/gp/hash=8e71405b-d3c8-473f-b3ec-64fa6515e135/group=34/db_name=db01/2022-12-15_db01.json.gz
#####################
# Boilerplate
#####################
spark = SparkSession.builder.appName('query-analysis').getOrCreate()
#####################
# Queries Dataframe
#####################
dfq = spark.read.json(S3_STL_RAW, primitivesAsString="true")
dfq.write.partitionBy('group').mode('overwrite').parquet(S3_DEST_PATH)
Upvotes: 1
Views: 786
Reputation: 5125
Great job flattening the files to increase read speed. Prefixes as you seem to understand are related to buckets and bucket read speed is related to the number of files under each prefix and their size. The approach you took will up reading faster than you original strategy. It will not help you with skew of the data itself.
One thing you might consider is that your raw data and working data do not need to be the same set of files. There is a strategy for landing data and then pre-processing it for performance.
That is to say keep the raw data in the format that you have now, then make a copy of the data in a more convenient format for regulatory queries. (Parquet is the best choice for working with S3).
view
that is a union of data in the 'landing zone' and the 'pre-pocesssed' folder. This gives you a performant table with up to date data.If you are using the latest S3 you should get consistent reads, that allow you to ensure you are querying on all the data. In days of the past S3 was eventually consistent meaning you might miss some data while it's in transit, this issue is supposedly fixed in the recent version of S3. Run this 'processing' as often as needed and you should have a performant table to run large queries on.
S3 was designed as a long term cheap storage. It's not made to perform quickly, but they've been trying to make it better over time.
It should be noted this will solve skew on read but won't solve skew on query. To solve the query portion you can enable Adaptive query in your config. (this will adaptively add an extra shuffle to make queries run faster.)
spark.sql.adaptive.enabled=true
Upvotes: 2