gumpcraca
gumpcraca

Reputation: 31

Dask Scheduler Memory

our dask scheduler process seems to balloon in memory as time goes on and executions continue. Currently we see it using 5GB of mem, which seems high since all the data is supposedly living on the worker nodes:

  PID   USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
  31172 atoz      20   0 5486944 5.071g   7100 S 23.8 65.0  92:38.64 dask-scheduler

when starting up the scheduler we would be below 1GB of memory use. Restarting the network doing a client.restart() doesn't seem to help, only a kill of the scheduler process itself and restart will free up the memory.

What is the expected usage of memory per single task executed? Is the scheduler really only maintaining pointers to which worker contains the future's result?

----edit----

I think my main concern here is why a client.restart() doesn't seem to release the memory being used by the scheduler process. I'm obviously not expecting it to release all memory, but to get back to a base level. We are using client.map to execute our function across a list of different inputs. After executing, doing a client restart over and over and taking snapshots of our scheduler memory we see the following growth: PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 27955 atoz 20 0 670556 507212 13536 R 43.7 6.2 1:23.61 dask-scheduler 27955 atoz 20 0 827308 663772 13536 S 1.7 8.1 16:25.85 dask-scheduler 27955 atoz 20 0 859652 696408 13536 S 4.0 8.5 19:18.04 dask-scheduler 27955 atoz 20 0 1087160 923912 13536 R 62.3 11.3 20:03.15 dask-scheduler 27955 atoz 20 0 1038904 875788 13536 S 3.7 10.7 23:57.07 dask-scheduler 27955 atoz 20 0 1441060 1.163g 12976 S 4.3 14.9 35:54.45 dask-scheduler 27955 atoz 20 0 1646204 1.358g 12976 S 4.3 17.4 37:05.86 dask-scheduler 27955 atoz 20 0 1597652 1.312g 12976 S 4.7 16.8 37:40.13 dask-scheduler

I guess I was just surprised that after doing a client.restart() we don't see the memory usage go back to some baseline.

----further edits---- Some more info about what we're running, since the suggestion was if we were passing in large data structures, to send them directly to the workers.

we send a dictionary as an input for each task, when json dumping the dict, most are under 1000 characters.

---- even further edits: Reproduced issue ---- We reproduced this issue again today. I killed off the scheduler and restarted it, we had about 5.4 GB of free memory, we then ran the function that I'll paste below across 69614 dictionary objects that really hold some file based information (all of our workers are mapped to the same NFS datastore and we are using Dask as a distributed file analysis system.

Here is the function (note: squarewheels4 is a homegrown lazy file extraction and analysis package, it uses Acora and libarchive as its base for getting files out of a compressed archive and indexing the file.)

def get_mrc_failures(file_dict):
    from squarewheels4.platforms.ucs.b_series import ChassisTechSupport
    from squarewheels4.files.ucs.managed.chassis import CIMCTechSupportFile
    import re

    dimm_info_re = re.compile(r"(?P<slot>[^\|]+)\|(?P<size>\d+)\|.*\|(?P<pid>\S+)")
    return_dict = file_dict
    return_dict["return_code"] = "NOT_FILLED_OUT"
    filename = "{file_path}{file_sha1}/{file_name}".format(**file_dict)

    try:
        sw = ChassisTechSupport(filename)
    except Exception as e:
        return_dict["return_code"] = "SW_LOAD_ERROR"
        return_dict["error_msg"] = str(e)
        return return_dict

    server_dict = {}

    cimcs = sw.getlist("CIMC*.tar.gz")
    if not cimcs:
        return_dict["return_code"] = "NO_CIMCS"
        return_dict["keys_list"] = str(sw.getlist("*"))
        return return_dict

    for cimc in cimcs:
        if not isinstance(cimc, CIMCTechSupportFile): continue
        cimc_id = cimc.number
        server_dict[cimc_id] = {}

        # Get MRC file
        try:
            mrc = cimc["*MrcOut.txt"]
        except KeyError:
            server_dict[cimc_id]["response_code"] = "NO_MRC"
            continue
        # see if our end of file marker is there, should look like:
        # --- END OF FILE (Done!
        whole_mrc = mrc.read().splitlines()
        last_10 = whole_mrc[-10:]

        eof_line = [l for l in last_10 if b"END OF FILE" in l]
        server_dict[cimc_id]["response_code"] = "EOF_FOUND" if eof_line else "EOF_MISSING"

        if eof_line:
            continue

        # get DIMM types
        hit_inventory_line = False
        dimm_info = []
        dimm_error_lines = []
        equals_count = 0
        for line in whole_mrc:
            # regex each line... sigh
            if b"DIMM Inventory" in line:
                hit_inventory_line = True

            if not hit_inventory_line:
                continue

            if hit_inventory_line and b"=========" in line:
                equals_count += 1
                if equals_count > 2:
                    break
                continue

            if equals_count < 2:
                continue

            # we're in the dimm section and not out of it yet
            line = str(line)
            reg = dimm_info_re.match(line)
            if not reg:
                #bad :/
                dimm_error_lines.append(line)
                continue
            dimm_info.append(reg.groupdict())

        server_dict[cimc_id]["dimm_info"] = dimm_info
        server_dict[cimc_id]["dimm_error_lines"] = dimm_error_lines

    return_dict["return_code"] = "COMPLETED"
    return_dict["server_dict"] = server_dict
    return return_dict

```

the futures are generated like:

futures = client.map(function_name, file_list)

After in this state my goal was to try and recover and have dask release the memory that it had allocated, here were my efforts: before cancelling futures:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6257840 4.883g   2324 S  0.0 62.6 121:21.93 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.1G        248M        9.9M        415M        383M
Swap:          8.0G        4.3G        3.7G

while cancelling futures:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6258864 5.261g   5144 R 60.0 67.5 122:16.38 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.5G        176M        9.4M        126M         83M
Swap:          8.0G        4.1G        3.9G

after cancelling futures:

PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6243760 5.217g   4920 S  0.0 66.9 123:13.80 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.5G        186M        9.4M        132M         96M
Swap:          8.0G        4.1G        3.9G

after doing a client.restart()

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
21914 atoz      20   0 6177424 5.228g   4912 S  2.7 67.1 123:20.04 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        7.5G        196M        9.4M        136M        107M
Swap:          8.0G        4.0G        4.0G

Regardless of what I ran through the distributed system, my expectation was that after cancelling the futures it would be back to at least close to normal... and after doing a client.restart() we would definitely be near our normal baseline. Am I wrong here?

--- second repro ---- Reproduced the behavior (although not total memory exhaustion) using these steps:

Here's my worker function

def get_fault_list_v2(file_dict):
    import libarchive
    return_dict = file_dict
    filename = "{file_path}{file_sha1}/{file_name}".format(**file_dict)
    with libarchive.file_reader(filename) as arc:
        for e in arc:
            pn = e.pathname
    return return_dict

I ran that across 68617 iterations / files

before running we saw this much memory being utilized: PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 12256 atoz 20 0 1345848 1.107g 7972 S 1.7 14.2 47:15.24 dask-scheduler

atoz@atoz-sched:~$ free -h
              total        used        free      shared  buff/cache   available
Mem:           7.8G        3.1G        162M         22M        4.5G        4.3G
Swap:          8.0G        3.8G        4.2G

After running we saw this much:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
12256 atoz      20   0 2461004 2.133g   8024 S  1.3 27.4  66:41.46 dask-scheduler

After doing a client.restart we saw:

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
12256 atoz      20   0 2462756 2.134g   8144 S  6.6 27.4  66:42.61 dask-scheduler

Upvotes: 3

Views: 1804

Answers (1)

MRocklin
MRocklin

Reputation: 57251

Generally a task should take up less than a kilobyte on the scheduler. There are a few things you can trip up on that result in storing significantly more, the most common of which is including data within the task graph, which is shown below.

Data included directly in a task graph is stored on the scheduler. This commonly occurs when using large data directly in calls like submit:

Bad

x = np.random.random(1000000)  # some large array
future = client.submit(np.add, 1, x)  # x gets sent along with the task

Good

x = np.random.random(1000000)  # some large array
x = client.scatter(x)  # scatter data explicitly to worker, get future back
future = client.submit(np.add, 1, x)  # only send along the future

This same principle exists using other APIs as well. For more information, I recommend providing an mcve. It's quite hard to help otherwise.

Upvotes: 3

Related Questions