user378147
user378147

Reputation:

Keras: deal with threads and large datasets

I'm trying to deal with a large training dataset in Keras.

I use model.fit_generator with a custom generator which reads data from a SQL file.

I've got an error message which tells me that I cannot use an SQLite object in two different threads:

ProgrammingError: SQLite objects created in a thread can only be used in that 
same thread.The object was created in thread id 140736714019776 and this is 
thread id 123145449209856

I tried to do the same with an HDF5 file, and ran into a segmentation fault which I now believe is also related the the multithreaded character of fit_generator (see bug reported here).

What is the proper way to use those generators, as I believe it is quite common to have to read data by batch from a file for datasets that don't fit into memory.

Here is the code for the generator:

class DataGenerator:
    def __init__(self, inputfile, batch_size, **kwargs):
        self.inputfile = inputfile
        self.batch_size = batch_size

    def generate(self, labels, idlist):
        while 1:
            for batch in self._read_data_from_hdf(idlist):
                batch = pandas.merge(batch, labels, how='left', on=['id'])
                Y = batch['label']
                X = batch.drop(['id', 'label'], axis=1)
                yield (X, Y)    

    def _read_data_from_hdf(self, idlist):
        chunklist = [idlist[i:i + self.batch_size] for i in range(0, len(idlist), self.batch_size)]
        for chunk in chunklist:
            yield pandas.read_hdf(self.inputfile, key='data', where='id in {}'.format(chunk))

# [...]

model.fit_generator(generator=training_generator,
                    steps_per_epoch=len(partitions['train']) // 
                    config['batch_size'],
                    validation_data=validation_generator,
                    validation_steps=len(partitions['validation']) // 
                    config['batch_size'],
                    epochs=config['epochs'])

See the full example repository here.

Thank you for your support.

Cheers,

Ben

Upvotes: 1

Views: 911

Answers (1)

Valentin Perret
Valentin Perret

Reputation: 31

Facing the same problem, I figured out a solution by combining a thread safety decorator with a sqlalchemy engine that can manage concurrent access to the database:

import pandas
from sqlalchemy import create_engine

class threadsafe_iter:
    def __init__(self, it):
        self.it = it
        self.lock = threading.Lock()

    def __iter__(self):
        return self

    def __next__(self):
        with self.lock:
            return next(self.it)


def threadsafe_generator(f):
    def g(*a, **kw):
        return threadsafe_iter(f(*a, **kw))
    return g


class DataGenerator:
    def __init__(self, inputfile, batch_size, **kwargs):
        self.inputfile = inputfile
        self.batch_size = batch_size
        self.sqlengine = create_engine('sqlite:///' + self.inputfile)

    def __del__(self):
        self.sqlengine.dispose()

    @threadsafe_generator
    def generate(self, labels, idlist):
        while 1:
            for batch in self._read_data_from_sql(idlist):
                Y = batch['label']
                X = batch.drop(['id', 'label'], axis=1)
                yield (X, Y)

    def _read_data_from_sql(self, idlist):
        chunklist = [idlist[i:i + self.batch_size]
                     for i in range(0, len(idlist), self.batch_size)]
        for chunk in chunklist:
            query = 'select * from data where id in {}'.format(tuple(chunk))
            df = pandas.read_sql(query, self.sqlengine)
            yield df

# Build keras model and instantiate generators

model.fit_generator(generator=training_generator,
                    steps_per_epoch=train_steps,
                    validation_data=validation_generator,
                    validation_steps=valid_steps,
                    epochs=10,
                    workers=4)

I hope that helps!

Upvotes: 3

Related Questions