Reputation:
I'm trying to deal with a large training dataset in Keras.
I use model.fit_generator
with a custom generator which reads data from a SQL file.
I've got an error message which tells me that I cannot use an SQLite object in two different threads:
ProgrammingError: SQLite objects created in a thread can only be used in that
same thread.The object was created in thread id 140736714019776 and this is
thread id 123145449209856
I tried to do the same with an HDF5 file, and ran into a segmentation fault which I now believe is also related the the multithreaded character of fit_generator
(see bug reported here).
What is the proper way to use those generators, as I believe it is quite common to have to read data by batch from a file for datasets that don't fit into memory.
Here is the code for the generator:
class DataGenerator:
def __init__(self, inputfile, batch_size, **kwargs):
self.inputfile = inputfile
self.batch_size = batch_size
def generate(self, labels, idlist):
while 1:
for batch in self._read_data_from_hdf(idlist):
batch = pandas.merge(batch, labels, how='left', on=['id'])
Y = batch['label']
X = batch.drop(['id', 'label'], axis=1)
yield (X, Y)
def _read_data_from_hdf(self, idlist):
chunklist = [idlist[i:i + self.batch_size] for i in range(0, len(idlist), self.batch_size)]
for chunk in chunklist:
yield pandas.read_hdf(self.inputfile, key='data', where='id in {}'.format(chunk))
# [...]
model.fit_generator(generator=training_generator,
steps_per_epoch=len(partitions['train']) //
config['batch_size'],
validation_data=validation_generator,
validation_steps=len(partitions['validation']) //
config['batch_size'],
epochs=config['epochs'])
See the full example repository here.
Thank you for your support.
Cheers,
Ben
Upvotes: 1
Views: 911
Reputation: 31
Facing the same problem, I figured out a solution by combining a thread safety decorator with a sqlalchemy
engine that can manage concurrent access to the database:
import pandas
from sqlalchemy import create_engine
class threadsafe_iter:
def __init__(self, it):
self.it = it
self.lock = threading.Lock()
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.it)
def threadsafe_generator(f):
def g(*a, **kw):
return threadsafe_iter(f(*a, **kw))
return g
class DataGenerator:
def __init__(self, inputfile, batch_size, **kwargs):
self.inputfile = inputfile
self.batch_size = batch_size
self.sqlengine = create_engine('sqlite:///' + self.inputfile)
def __del__(self):
self.sqlengine.dispose()
@threadsafe_generator
def generate(self, labels, idlist):
while 1:
for batch in self._read_data_from_sql(idlist):
Y = batch['label']
X = batch.drop(['id', 'label'], axis=1)
yield (X, Y)
def _read_data_from_sql(self, idlist):
chunklist = [idlist[i:i + self.batch_size]
for i in range(0, len(idlist), self.batch_size)]
for chunk in chunklist:
query = 'select * from data where id in {}'.format(tuple(chunk))
df = pandas.read_sql(query, self.sqlengine)
yield df
# Build keras model and instantiate generators
model.fit_generator(generator=training_generator,
steps_per_epoch=train_steps,
validation_data=validation_generator,
validation_steps=valid_steps,
epochs=10,
workers=4)
I hope that helps!
Upvotes: 3