Jeroen Bos
Jeroen Bos

Reputation: 97

Spark - loading many small csv takes very long

Description At my work place we have a large amount of data that needs processing. It concerns a rapidly growing amount of instances (currently ~3000) which all have a few megabytes worth of data stored in gzipped csv files on S3.

I have setup a spark cluster and wrote a spark script that does the following.

  1. For every instance:

    • load the data frame
    • run calculations
    • but does not save the dataframe yet ( so no action is triggered, which I confirmed in the spark job UI )
  2. Afterwards, I combine all the data frames into a single dataframe and save the result (therefore triggering an action)

Problem The above works perfectly fine when I use a small amount of instances. But I found the following problems: - when an instance file is loaded into a data frame it takes 4-6 seconds without triggering any action. - the loading of the dataframes happens on the driver - because of the above two it takes nearly 2 hours to load the data frames ( optimized this a bit by using python "threads"

Could someone explain me what is causing the slow loading and advice me how to deal with this?

Maybe relevant information is that I am using the aws s3a hadoop file system. Also the first part of my calculation is completely standalone per instance, which is why I am a bit hesitant to combine all the input data into one gzipped csv file among other reasons.

Any help would be greatly appreciated, I am writing this after breaking my head over this problem until 5 in the night.

Please let me know if I should provide more details.

Edit

Thanks for the comments, I am running spark on kubernetes so I can't merge the files using hadoop commands. However I am pursuing the idea of merging the instance files.

Edit 2 Turns out I was using spark in the complety wrong way, I thought I would make it easier for spark by keeping the data separate, however that backfired. The best solution seems to aggregate your input files into larger ones. And adapt your script to keep them separate.

Upvotes: 2

Views: 921

Answers (2)

niuer
niuer

Reputation: 1669

I would try following methods:

  1. If each instance stays the same once it's created, only the number of instances are increasing with days, I would:

    (1) Load all instances and combine them into one big DataFrame, make extra column to indicate instance ID.

    (2) Save the big DataFrame

    (3) Going forward, on daily basis maybe, the task is just to load the big DataFrame and new instances, combine them, do computation, save the output. Since each instance data have their own instance ID, you still have the capability to run parallel computation on them.

    (4) Also, when you load csv, if all instances have the same header, try to specify schema when loading. That might save you some time.

  2. This I have never tried myself, and not sure if it works at all, just trying to brain storm here :)

(1) You can create a DataFrame, one column is instance ID, the other column is the address of corresponding instance or file name.

(2) You then groupby on the instance ID, and in your udf, load the csv files. So the csv loading will be distributed among workers instead of all running on the driver.

(3) The groupby shall return a combined DataFrame automatically. You then proceed from it.

Hope it helps. Please let us know how do you solve the problem.

Upvotes: 1

FraDel
FraDel

Reputation: 184

Spark isn't really optimized to handle a lot a small files. I don't know if it a possibility but you could try to aggregate your the small files into bigger ones that might do the trick.

Upvotes: 1

Related Questions