egold
egold

Reputation: 57

unable to persist dask dataframe after read_sql_table

I am trying to read a database table into a dask dataframe and then persist the dataframe. I have tried a few variations, and they either cause an out of memory condition, or an error.

I am working from a Windows 10 laptop with 8 GB of memory. The problem began when I tried to read in large MySQL or Oracle database tables. I can reproduce the problem with SQLite.

Here is the code for setting up a 700 MB SQLite table to reproduce the problem. (Please excuse any clumsiness in the python code -- I have been a SAS data analyst for 10 years. I am looking for a less expensive alternative, so I am new to python, numpy, pandas, and dask. Note that SAS can read in the SQLite table, write it to disk, and create an index in 90 seconds, without locking the laptop.)

import numpy as np
from sqlalchemy import create_engine, MetaData, Table, Column, Integer
import sqlite3

# function to create a list of dicts with chunkSize rows by 3columns
# except for the first column, the columns are filled with random integers

def getChunk(chunkSize, prevCount):
    x = np.random.randint(low=0, high=2**32, size=(chunkSize,3), dtype='int64')
    y = x.ravel().view(dtype=[('a', 'i8'), ('b', 'i8'), ('c', 'i8')])
    y['a'] = [k + prevCount for k in range(chunkSize)]
    names = y.dtype.names
    listOfDicts = [dict(zip(names, row)) for row in y] 
    return listOfDicts

# set up a SQLAlchemy engine to a sqlite DB

dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
engine = create_engine(connString)

# create a table with 3 Integer columns

metadata = MetaData()
testTable = Table('testTbl', metadata,
                  Column('a', Integer, primary_key='True'),
                  Column('b', Integer),
                  Column('c', Integer)
                 )

metadata.create_all(engine)
conn = engine.connect()

chunkSize = 25000
numberChunks = 1400

sqlite3.register_adapter(np.int64, lambda x: int(x))

# use the SQLAlchemy table insert method to load list of dicts into the table, one chunk at a time
prevCount = 0

with conn.begin():
    for i in range(0, numberChunks) :
        listOfDicts = getChunk(chunkSize, prevCount)
        conn.execute(testTable.insert(), listOfDicts)
        prevCount = prevCount + chunkSize

conn.close()

I have tried 4 variations on the dask scheduler:

  1. default scheduler -- this was OOM and the laptop locked up.

  2. local distributed scheduler with multiple processes -- this gave a tornado exception

  3. local distributed scheduler with one process -- this was OOM.

  4. starting dask-scheduler and dask-worker from the command line, limiting the worker memory to 3 GB. This variation caused an error and the worker was killed.

The code for each variation is below. How can I make this work?

1.

# default scheduler -- OOM
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest

cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache)
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = df.persist() 
  1. local distributed scheduler

    import dask.dataframe as ddf
    from dask.distributed import Client
    import dask
    import chest
    cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
    dask.set_options(cache=cache)
    client = Client()  
    dbPath = "C:\\temp2\\test.db"
    connString = "sqlite:///{}".format(dbPath)
    df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
    df = client.persist(df)
    

The exception begins like this:

>>> tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:57522, threads: 1>>
Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 635, in wrapper
    return fun(self, *args, **kwargs)
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 821, in create_time
    return cext.proc_create_time(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 368, in _init
    self.create_time()
  File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 699, in create_time
    self._create_time = self._proc.create_time()
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 640, in wrapper
    raise NoSuchProcess(self.pid, self._name)
psutil._exceptions.NoSuchProcess: psutil.NoSuchProcess process no longer exists (pid=14212)
  1. one process -- OOM

    import dask.dataframe as ddf
    from dask.distributed import Client
    import dask
    import chest
    cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
    dask.set_options(cache=cache, get=dask.get)
    client = Client(processes=False)
    dbPath = "C:\\temp2\\test.db"
    connString = "sqlite:///{}".format(dbPath)
    df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
    df = client.persist(df, get=dask.get)
    
  2. dask-scheduler, dask-worker

One command line: c:>dask-scheduler --host 127.0.0.1

Another command line: c:>dask-worker 127.0.0.1:8786 --nprocs 1 --nthreads 1 --name worker-1 --memory-limit 3GB --local-directory c:\temp2

import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest
cache = chest.Chest(path='c:\\temp2', available_memory=8e9)
dask.set_options(cache=cache)
client = Client(address="127.0.0.1:8786")
dbPath = "C:\\temp2\\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = client.persist(df)

The worker is killed over and over with these messages:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.12 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.16 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.24 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.31 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.39 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Worker is at 81% memory usage. Pausing worker.  Process memory: 2.46 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.47 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.54 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.61 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.66 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.73 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.81 GB -- Worker memory limit: 3.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker process 17916 was killed by signal 15
distributed.nanny - WARNING - Restarting worker

Upvotes: 0

Views: 1683

Answers (1)

mdurant
mdurant

Reputation: 28694

I don't believe you have an index on column 'a', which means that each partition access is probably using a lot of memory within sqlite while scanning the table. In any case, pandas' access to DBs via sqlalchemy is not particularly memory-efficient, so I am not hugely surprised that you have a memory spike during access.

You can, however, increase the number of partitions to be able to access the data. For example:

df = ddf.read_sql_table('testTbl', connString, index_col = 'a', npartitions=20)

or perhaps reduce the number of threads/processes available, so that there is more memory for each thread.

Note that chest is not helping you at all here, it can only save completed results, and the memory spike is happening during the loading of the data (furthermore, the distributed workers should spill to disc without explicitly giving a cache).

Upvotes: 1

Related Questions