Deepak Gaur
Deepak Gaur

Reputation: 173

Spark - Skewed Input dataframe

i am working with a heavily nested non-splittable json format input dataset housed in S3. The files can vary a lot in their sizes - minimum is 10kb while other is 300 MB.

When reading the file using the below code, and just doing a simple repartition to desired number of partitions leads to straggling tasks - most tasks finish within seconds but one would last for couple hours and then runs into memory issues (heartbeat missing/ heap space etc.) I repartition in an attempt to randomize the partition to file mapping since spark may be reading files in sequence and files within same directory tend to have same nature -all large/all small etc.

df = spark.read.json('s3://my/parent/directory')
df.repartition(396)

# Settings (few):
default parallelism = 396
total number of cores = 400

What I tried:

  1. I figured that the input partitions (s3 partitions not spark) scheme (folder hierarchy) might be leading to this skewed partitions problem, where some s3 folders (techinically 'prefixes') have just one file while other has thousands, so I transformed the input to a flattened directory structure using hashcodewhere each folder has just one file:

Earlier:

/parent1

             /file1
             /file2
             .
             .
             /file1000

/parent2/
           /file1

Now:

   hashcode=FEFRE#$#$$#FE/parent1/file1
   hashcode=#$#$#Cdvfvf@#/parent1/file1

But it didnt have any effect.

  1. I have tried with really large clusters too - thinking that even if there is input skew - that much memory should be able to handle the larger files. But I still get into the straggling tasks.

When I check the number of files (each file becomes a row in dataframe due to its nested - unsplittable nature) assigned to each partition - I see number of files assigned to be between 2 to 32. Is it because spark picks up the files in partitions based on spark.sql.files.maxPartitionBytes - and probably its assigning only two files where the file size is huge , and much more files to single partition when the filesize is less?

Any recommendations to make the job work properly, and distribute the tasks uniformly - given size of input files is something that can not be changed due to nature of input files.

EDIT: Code added for latest trial per Abdennacer's comment

This is the entire code.

Results: The job gets stuck in Running - even though in worker logs I see the task finished. The driver log has error 'can not increase buffer size' - I dont know what is causing the 2GB buffer issue, since I am not issuing any 'collect' or similar statement.

Configuration for this job is like below to ensure the executor has huge memory per task.

Driver/Executor Cores: 2.

Driver/ Executor memory: 198 GB.


# sample s3 source path:
s3://masked-bucket-urcx-qaxd-dccf/gp/hash=8e71405b-d3c8-473f-b3ec-64fa6515e135/group=34/db_name=db01/2022-12-15_db01.json.gz

#####################
# Boilerplate
#####################
spark = SparkSession.builder.appName('query-analysis').getOrCreate()

#####################
# Queries Dataframe
#####################
dfq = spark.read.json(S3_STL_RAW, primitivesAsString="true")
dfq.write.partitionBy('group').mode('overwrite').parquet(S3_DEST_PATH)

Upvotes: 1

Views: 786

Answers (1)

Matt Andruff
Matt Andruff

Reputation: 5125

Great job flattening the files to increase read speed. Prefixes as you seem to understand are related to buckets and bucket read speed is related to the number of files under each prefix and their size. The approach you took will up reading faster than you original strategy. It will not help you with skew of the data itself.

One thing you might consider is that your raw data and working data do not need to be the same set of files. There is a strategy for landing data and then pre-processing it for performance.

That is to say keep the raw data in the format that you have now, then make a copy of the data in a more convenient format for regulatory queries. (Parquet is the best choice for working with S3).

  1. Land data a 'landing zone'
  2. As needed process the data stored in the landing zone, into a convenient splittable format for querying. ('pre-processed folder' )
  3. Once your raw data is processed move it to a 'processed folder'. (Use Your existing flat folder structure.) This processing table is important should you need to rebuild the table or make changes to the table format.
  4. Create a view that is a union of data in the 'landing zone' and the 'pre-pocesssed' folder. This gives you a performant table with up to date data.

If you are using the latest S3 you should get consistent reads, that allow you to ensure you are querying on all the data. In days of the past S3 was eventually consistent meaning you might miss some data while it's in transit, this issue is supposedly fixed in the recent version of S3. Run this 'processing' as often as needed and you should have a performant table to run large queries on.

S3 was designed as a long term cheap storage. It's not made to perform quickly, but they've been trying to make it better over time.

It should be noted this will solve skew on read but won't solve skew on query. To solve the query portion you can enable Adaptive query in your config. (this will adaptively add an extra shuffle to make queries run faster.)

spark.sql.adaptive.enabled=true

Upvotes: 2

Related Questions