blue
blue

Reputation: 21

dask performance decreases with increasing number of delayed objects passed to dask.compute with scheduler='processes'

When I create a list of delayed tasks and submit them to compute with scheduler='processes', dask performance decreases as the number of items on the list increases.

As I increase the number of elements in the list, I see a clear increase in overall runtime. I was expecting dask to improve performance up to some limit but clearly there's something I don't understand. I have 10 directories of data with each directory containing 40 files. I apply a set of parsers to the files to generate my output. Each parser reads a file into a pandas dataframe, does some analytics and then writes to a file. Some instances run for 1-2 seconds and some run for 10-15 min depending on the size of the files and the complexity of the parser. The parsers don't share any data at all...they just read/parse/write to disk.

I'm new to dask but tried using it to process these in parallel but it's actually slowing down the work as I increase the number of processes created.

Total run times are: ('step' is a parameter in the code below)

step = 1: 2104 seconds

step = 2: 1937 seconds

step = 5: 2894 seconds

step = 10: 3564 seconds

When I look at top, I see 6 processes operating with 99% CPU and 0.1% of memory. Each process only reads/writes to disk once so that shouldn't cause any delays.

There is nothing else running on my linux box.


for i in range(0, len(list_of_directories), step):
    process_files(list_of_directories[i:i+step])

def process_files(list_of_directories):
    parser_processes = []
    for each_directory in list_of_directories:
        files = read_files_in_directory(each_directory)
        for each_file in files:
            for parser in parser_list:
                parser_processes.append(dask.delayed(parser)(each_file))
    dask.compute(*parser_processes, scheduler='processes')

def parser(file):
    df = pandas.read_csv(file)
    <do stuff>
    <write df to disk>


I should add that if I remove dask and just apply parser to all the files serially, run time is 2917 seconds. Clearly dask is providing some improvement for smaller values of 'step' but it's actually worse for large values of 'step'.

Why does dask performance get worse as the number of delayed items passed to compute increases? Since memory usage is so low, the results seem to imply that CPU resources are being thrashed. Am I thinking about this correctly?

Upvotes: 1

Views: 346

Answers (1)

MRocklin
MRocklin

Reputation: 57251

Performance could depend on many things. The first step is usually to profile your computation so that you can see what is going on. Parallel profiling is hard, but fortunately there are tools to help.

I recommend Dask's Understanding Performance document in general, and the use of the dask distributed dashboard specifically (which also works great on a single machine).

Upvotes: 1

Related Questions