Reputation: 1656
I have two files that I am not able to load on my RAM (above 100GB).
Both files are in a similar format, for example:
A 8
B 6
C 9
And
AC 8
BA 6
CQ 35
I also have a dictionary that will include my mapping, for example
alias_dict = {"A": "AC", "B": "BA", "C": "CQ"}
What I want to do, is simply to apply the dictionary over the first dataframe and then merge between both dataframes so that in this example, my expected output would be
AC 8
BA 6
I try to achieve this with the following code
pileup_df['identifier'] = pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
pileup_df.compute()
lists_df['identifier'] = lists_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
lists_df.compute()
intersection_df = dd.merge(pileup_df, lists_df, on=['identifier', 'position'])
intersection_df = intersection_df[['identifier', 'position']]
intersection_df.compute()
The problem is the conversion only takes place if I specify pileup_df.compute()
and lists_df.compute()
like it's written in the code above but this actually load to memory these dataframes.
When I remove the compute
statements I mentioned above my dataframe remaining in his unconverted form.
Just for clarity, the code that isn't working as expected is
pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
lists_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
intersection_df = dd.merge(pileup_df, lists_df, on=['identifier', 'position'])
intersection_df = intersection_df[['identifier', 'position']]
intersection_df.compute()
Is there a way to apply this conversion and merge without first loading the converted dataframe to memory?
Upvotes: 0
Views: 276
Reputation: 28683
I think you have some misunderstanding about handling dask objects - let me try to clarify.
An expression like
pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
produces a lazy output object, in this case a series, and nothing is computed. But if you do not assign this thing to a variable or back into a column of a dataframe, then not even the lazy process is kept.
Instead, as you had in the first example
pileup_df['identifier'] = pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
now the definition of pileup_df
is changed to include the mapping of the column, but the dataframe is still lazy and not loaded into memory (which is good!).
A line like
pileup_df.compute()
computes your object, including any changes you made by assignment, and puts it into memory. This might fill up your memory - but you, again, didn't actually assign the output. .compute()
doe not change the object it is applied to, but instead creates a new object, here a Pandas dataframe.
Your code should perhaps be
pileup_df['identifier'] = pileup_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
lists_df['identifier'] = lists_df.identifier.map(lambda identifier: alias_dict[identifier], meta=('identifier', str))
intersection_df = dd.merge(pileup_df, lists_df, on=['identifier', 'position'])
intersection_df = intersection_df[['identifier', 'position']]
result = intersection_df.compute()
Are you sure the end result fits in memory? Note that you didn't mention the scheduler you are using here; you might need considerably more memory than just the size of the output, for various intermediate results. Often, it's better to immediately perform whatever final work you need to - aggregation before the compute, or output to file. Examples:
result = intersection_df.sum(...).compute() # aggregation
intersection_df.to_parquet(...). # output to file
Upvotes: 1