Eliran Turgeman
Eliran Turgeman

Reputation: 1656

Mapping and merging in dask without loading to memory

I have two files that I am not able to load on my RAM (above 100GB).

Both files are in a similar format, for example:

A  8
B  6
C  9

And

AC  8
BA  6
CQ  35

I also have a dictionary that will include my mapping, for example

alias_dict = {"A": "AC", "B": "BA", "C": "CQ"}

What I want to do, is simply to apply the dictionary over the first dataframe and then merge between both dataframes so that in this example, my expected output would be

AC  8
BA  6

I try to achieve this with the following code

pileup_df['identifier'] = pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
pileup_df.compute()
lists_df['identifier'] = lists_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
lists_df.compute()

intersection_df = dd.merge(pileup_df, lists_df, on=['identifier', 'position'])
intersection_df = intersection_df[['identifier', 'position']]
intersection_df.compute()

The problem is the conversion only takes place if I specify pileup_df.compute() and lists_df.compute() like it's written in the code above but this actually load to memory these dataframes.
When I remove the compute statements I mentioned above my dataframe remaining in his unconverted form.

Just for clarity, the code that isn't working as expected is

pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
lists_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))

intersection_df = dd.merge(pileup_df, lists_df, on=['identifier', 'position'])
intersection_df = intersection_df[['identifier', 'position']]
intersection_df.compute()

Is there a way to apply this conversion and merge without first loading the converted dataframe to memory?

Upvotes: 0

Views: 276

Answers (1)

mdurant
mdurant

Reputation: 28683

I think you have some misunderstanding about handling dask objects - let me try to clarify.

An expression like

pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))

produces a lazy output object, in this case a series, and nothing is computed. But if you do not assign this thing to a variable or back into a column of a dataframe, then not even the lazy process is kept.

Instead, as you had in the first example

pileup_df['identifier'] = pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))

now the definition of pileup_df is changed to include the mapping of the column, but the dataframe is still lazy and not loaded into memory (which is good!).

A line like

pileup_df.compute()

computes your object, including any changes you made by assignment, and puts it into memory. This might fill up your memory - but you, again, didn't actually assign the output. .compute() doe not change the object it is applied to, but instead creates a new object, here a Pandas dataframe.

Your code should perhaps be

pileup_df['identifier'] = pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
lists_df['identifier'] = lists_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
intersection_df = dd.merge(pileup_df, lists_df, on=['identifier', 'position'])
intersection_df = intersection_df[['identifier', 'position']]
result = intersection_df.compute()

Are you sure the end result fits in memory? Note that you didn't mention the scheduler you are using here; you might need considerably more memory than just the size of the output, for various intermediate results. Often, it's better to immediately perform whatever final work you need to - aggregation before the compute, or output to file. Examples:

result = intersection_df.sum(...).compute()  # aggregation
intersection_df.to_parquet(...). # output to file

Upvotes: 1

Related Questions