Reputation: 57
I am trying to read a database table into a dask dataframe and then persist the dataframe. I have tried a few variations, and they either cause an out of memory condition, or an error.
I am working from a Windows 10 laptop with 8 GB of memory. The problem began when I tried to read in large MySQL or Oracle database tables. I can reproduce the problem with SQLite.
Here is the code for setting up a 700 MB SQLite table to reproduce the problem. (Please excuse any clumsiness in the python code -- I have been a SAS data analyst for 10 years. I am looking for a less expensive alternative, so I am new to python, numpy, pandas, and dask. Note that SAS can read in the SQLite table, write it to disk, and create an index in 90 seconds, without locking the laptop.)
import numpy as np
from sqlalchemy import create_engine, MetaData, Table, Column, Integer
import sqlite3
# function to create a list of dicts with chunkSize rows by 3columns
# except for the first column, the columns are filled with random integers
def getChunk(chunkSize, prevCount):
x = np.random.randint(low=0, high=2**32, size=(chunkSize,3), dtype='int64')
y = x.ravel().view(dtype=[('a', 'i8'), ('b', 'i8'), ('c', 'i8')])
y['a'] = [k + prevCount for k in range(chunkSize)]
names = y.dtype.names
listOfDicts = [dict(zip(names, row)) for row in y]
return listOfDicts
# set up a SQLAlchemy engine to a sqlite DB
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
engine = create_engine(connString)
# create a table with 3 Integer columns
metadata = MetaData()
testTable = Table('testTbl', metadata,
Column('a', Integer, primary_key='True'),
Column('b', Integer),
Column('c', Integer)
)
metadata.create_all(engine)
conn = engine.connect()
chunkSize = 25000
numberChunks = 1400
sqlite3.register_adapter(np.int64, lambda x: int(x))
# use the SQLAlchemy table insert method to load list of dicts into the table, one chunk at a time
prevCount = 0
with conn.begin():
for i in range(0, numberChunks) :
listOfDicts = getChunk(chunkSize, prevCount)
conn.execute(testTable.insert(), listOfDicts)
prevCount = prevCount + chunkSize
conn.close()
I have tried 4 variations on the dask scheduler:
default scheduler -- this was OOM and the laptop locked up.
local distributed scheduler with multiple processes -- this gave a tornado exception
local distributed scheduler with one process -- this was OOM.
starting dask-scheduler and dask-worker from the command line, limiting the worker memory to 3 GB. This variation caused an error and the worker was killed.
The code for each variation is below. How can I make this work?
1.
# default scheduler -- OOM
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest
cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache)
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = df.persist()
local distributed scheduler
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest
cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache)
client = Client()
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = client.persist(df)
The exception begins like this:
>>> tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:57522, threads: 1>>
Traceback (most recent call last):
File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 635, in wrapper
return fun(self, *args, **kwargs)
File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 821, in create_time
return cext.proc_create_time(self.pid)
ProcessLookupError: [Errno 3] No such process
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 368, in _init
self.create_time()
File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 699, in create_time
self._create_time = self._proc.create_time()
File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 640, in wrapper
raise NoSuchProcess(self.pid, self._name)
psutil._exceptions.NoSuchProcess: psutil.NoSuchProcess process no longer exists (pid=14212)
one process -- OOM
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest
cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache, get=dask.get)
client = Client(processes=False)
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = client.persist(df, get=dask.get)
dask-scheduler, dask-worker
One command line: c:>dask-scheduler --host 127.0.0.1
Another command line: c:>dask-worker 127.0.0.1:8786 --nprocs 1 --nthreads 1 --name worker-1 --memory-limit 3GB --local-directory c:\temp2
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest
cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache)
client = Client(address="127.0.0.1:8786")
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = client.persist(df)
The worker is killed over and over with these messages:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.12 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.16 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.24 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.31 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.39 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Worker is at 81% memory usage. Pausing worker. Process memory: 2.46 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.47 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.54 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.61 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.66 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.73 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 2.81 GB -- Worker memory limit: 3.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 17916 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
Upvotes: 0
Views: 1683
Reputation: 28694
I don't believe you have an index on column 'a'
, which means that each partition access is probably using a lot of memory within sqlite while scanning the table. In any case, pandas' access to DBs via sqlalchemy is not particularly memory-efficient, so I am not hugely surprised that you have a memory spike during access.
You can, however, increase the number of partitions to be able to access the data. For example:
df = ddf.read_sql_table('testTbl', connString, index_col = 'a', npartitions=20)
or perhaps reduce the number of threads/processes available, so that there is more memory for each thread.
Note that chest
is not helping you at all here, it can only save completed results, and the memory spike is happening during the loading of the data (furthermore, the distributed workers should spill to disc without explicitly giving a cache).
Upvotes: 1