Reputation: 309
Here's a simplified version of my code.
import dask
import dask.dataframe as dask_frame
from dask.distributed import Client, LocalCluster
def main():
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
csv_path_one = "" # both have 70 columns and around 70 million rows. at a size of about 25 gigabytes
csv_path_two = ""
# the columns are a mix of ints floats datetimes and strings
# almost all string lengths are less than 15 two of the longest string columns have a max length of 70
left_df = dask_frame.read_csv(csv_path_one, sep="|", quotechar="+", encoding="Latin-1", dtype="object")
right_df = dask_frame.read_csv(csv_path_one, sep=",", quotechar="\"", encoding="utf-8", dtype="object")
cand_keys = [""] # I have 3
merged = dask_frame.merge(left_df, right_df, how='outer', on=cand_keys, suffixes=("_L", "_R"),indicator=True)
missing_mask = merged._merge != 'both'
missing_findings: dask_frame.DataFrame = merged.loc[missing_mask, cand_keys + ["_merge"]]
print(f"Running {client}")
missing_findings.to_csv("results/findings-*.csv")
cluster.close()
client.close()
if __name__ == '__main__':
main()
This example never finishes, dask gets to a certain part then one or more workers instantly exceed the memory limit and the nanny kills them and rolls back all of the worker's progress
Looking at the diagnostics page usually the memory spikes happen about halfway through the shuffle-split tasks.
I'm running Dask 2.9.1 on Windows. I can update Dask but it's pain with my current setup and I don't know if it'll fix my issue
Upvotes: 0
Views: 293