Reputation: 309
I need to read entries from an SQL table into a dask dataframe for processing the amount of entries I need to read in may exceed the amount of entries that can fit into memory How do I accomplish this using dask?
I'm currently doing this
def get_frame_from_query(query, column_names):
frames = []
batch = []
# TODO ask stackoverflow about this.
for row in database_conn.FetchManyIter(query,batch_size=30000): #read from the database 30k rows at a time.
batch.append({ col:getattr(row,col) for col in column_names})
if len(batch) == 30000:
pd_frame = pd.DataFrame(batch, columns=column_names)
frames.append(dd.from_pandas(pd_frame, npartitions=1, sort=False))
batch = []
if len(batch) > 0:
pd_frame = pd.DataFrame(batch)
frames.append(dd.from_pandas(pd_frame, npartitions=1, sort=False))
return dd.concat(frames)
I figured when I converted it to a dask dataframe it would write the frame to disk. But looking at my memory usage it doesn't seem to be doing that.
Upvotes: 0
Views: 2981
Reputation: 28683
I would encourage you to check out the read_sql_table
function, which you may well find "just works" for your job.
Aside from that, you should not be surprised by the memory usage, since you are loading every single chunk of pandas dataframe into memory before Dask gets any chance to do anything for you. Since you are using a single iterator, it is very hard to reframe this into a lazy/delayed operation for Dask, you need some other way for tasks to get the parts of your data independently, in parallel.
Upvotes: 1