mEdi Han
mEdi Han

Reputation: 31

Preprocess huge data with a custom data generator function for keras

Actually I'm building a keras model and I have a dataset in the msg format with over 10 million instances with 40 features which are all categorical. For the moment i'm using just a sample of it since reading all the dataset and encoding it is impossbile to fit into the memory. Here a part of the code i'm using:

import pandas as pd
from category_encoders import BinaryEncoder as be
from sklearn.preprocessing import StandardScaler


def model():
   model = Sequential()
   model.add(Dense(120, input_dim=233, kernel_initializer='uniform', activation='selu'))
   model.add(Dense(12, kernel_initializer='uniform', activation='sigmoid'))
   model.compile(SGD(lr=0.008),loss='mean_squared_error', metrics=['accuracy'])
   return model

def addrDataLoading():

   data=pd.read_msgpack('datum.msg')
   data=data.dropna(subset=['s_address','d_address'])
   data=data.sample(300000) # taking a sample of all the dataset to make the    encoding possible
   y=data[['s_address','d_address']]
   x=data.drop(['s_address','d_address'],1)

   encX = be().fit(x, y)
   numeric_X= encX.transform(x)
   encY=be().fit(y,y)
   numeric_Y=encY.transform(y)
   scaler=StandardScaler()
   X_all=scaler.fit_transform(numeric_X)
   x_train=X_all[0:250000,:]
   y_train=numeric_Y.iloc[0:250000,:]
   x_val=X_all[250000:,:]    
   y_val=numeric_Y.iloc[250000:,:]

   return x_train,y_train,x_val,y_val 



x_train,y_train,x_val,y_val=addrDataLoading()

model.fit(x_train, y_train,validation_data=(x_val,y_val),nb_epoch=20, batch_size=200)

So my question is how to use a custom data generator function to read and process all the data I have and not just a sample, and then use fit_generator() function to train my model?

EDIT

This is a sample of the data: netData

I think that taking different samples from the data results in different encoding dimensions.

For this sample there's 16 different categories: 4 addresses (3 bit), 4 hostnames (3 bit ), 1 subnetmask (1 bit), 5 infrastructures (3 bit ), 1 accesszone(1 bit ), so the binary encoding will give us 11 bit and the new dimension of the data is 11 previously 5. So let's say for another sample in the address column we have 8 different categories this will give 4 bit in binary and we let the same number of categories in the other columns so the overall encoding will result in 12 dimensions. I believe that what's causing the problem.

Upvotes: 2

Views: 1693

Answers (1)

Mikhail Stepanov
Mikhail Stepanov

Reputation: 3790

Slightly slow solution (repeating the same actions)

Edit - fit BinatyEncoder before create generators

Drop NA first and work with clean data further to avoid reassignments of data frame.

data = pd.read_msgpack('datum.msg')
data.dropna(subset=['s_address','d_address']).to_msgpack('datum_clean.msg')

In this solution data_generator can process same data multiple times. If it's not critical, you can use this solution.

Define function which reads the data snd splits index to train and test. It won't consume a lot of memory.

import pandas as pd
from category_encoders import BinaryEncoder as be
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np

def model():
   #some code defining the model


def train_test_index_split():
    # if there's enough memory to add one more column
    data = pd.read_msgpack('datum_cleaned.msg')
    train_idx, test_idx = train_test_split(data.index) 
    return data, train_idx, test_idx


data, train_idx, test_idx = train_test_index_split()

Define and initialize data generator, both for train and validation

def data_generator(data, encX, encY,  bathc_size, n_steps, index):
    # EDIT: As the data was cleaned, you don't need dropna
    # data = data.dropna(subset=['s_address','d_address'])
    for i in range(n_steps):
        batch_idx = np.random.choice(index, batch_size)
        sample = data.loc[batch_idx]
        y = sample[['s_address', 'd_address']]
        x = sample.drop(['s_address', 'd_address'], 1)
        numeric_X = encX.transform(x)
        numeric_Y = encY.transform(y)
        scaler = StandardScaler()
        X_all = scaler.fit_transform(numeric_X)
        yield X_all, numeric_Y

Edited part now train binary encoders. You should sub-sample your data to create representative training set for encoders. I guess error with the shape of the data was caused by incorrecly trained BinaryEncoder (Error when checking input: expected dense_9_input to have shape (233,) but got array with shape (234,)):

def get_minimal_unique_frame(df):
    return (pd.Series([df[column].unique() for column in df], index=df.columns)  
           .apply(pd.Series)  # tranform list on unique values to pd.Series
           .T  # transope frame: columns is columns again
           .fillna(method='ffill'))  # fill NaNs with last value

x = get_minimal_unique_frame(data.drop(['s_address', 'd_address'], 1))
y = get_minimal_unique_frame(data[['s_address', 'd_address']])

NB: I never used category_encoders and have incompatible system configuration, so can't install and check it. So, former code can evoke problems. In that case, I guess, you should compare length of x and y data frames and make it the same, and probaly change an index of data frames.

encX = be().fit(x, y)
encY = be().fit(y, y)
batch_size = 200
train_steps = 100000  
val_steps = 5000

train_gen = data_generator(data, encX, encY, batch_size, train_steps, train_idx)
test_gen = data_generator(data, encX, encY, batch_size, test_steps, test_idx)

Edit Please provide an exapmple of x_sample, run train_gen and save output, and post x_samples, y_smaples:

x_samples = []
y_samples = []
for i in range(10):
    x_sample, y_sample = next(train_gen)
    x_samples.append(x_sample)
    y_samples.append(y_sample)

Note: data generator won't stop itself. But itt will be stopped after train_steps by fit_generator method.

Fit model with generators:

model.fit_generator(generator=train_gen, steps_per_epoch=train_steps, epochs=1,
                    validation_data=test_gen, validation_steps=val_steps)

As far as I know, python does not copy pandas dataframes if you won't do it explicitply with copy() or so. Because of it, both generators use the same object. But if you use Jupyter Notebook, data leaks/uncollected carbage may occur, and a memory troubles comes with them.

More efficient solution - scketch

Clean your data

data = pd.read_msgpack('datum.msg')
data.dropna(subset=['s_address','d_address']).to_msgpack('datum_clean.msg')

Create train/test split, preprocess it and store as numpy array, if you have enough disk space.

data, train_idx, test_idx = train_test_index_split()

def data_preprocessor(data, path, index):
    # data = data.dropna(subset=['s_address','d_address'])
    sample = data.loc[batch_idx]
    y = sample[['s_address', 'd_address']]
    x = sample.drop(['s_address', 'd_address'], 1)
    encX = be().fit(x, y)
    numeric_X = encX.transform(x)
    encY = be().fit(y, y)
    numeric_Y = encY.transform(y)
    scaler = StandardScaler()
    X_all = scaler.fit_transform(numeric_X)
    np.save(path + '_X', X_all)
    np.save(path + '_y', numeric_Y)

data_preprocessor(data, 'train', train_idx)
data_preprocessor(data, 'test', test_idx)

Delete unnecessary data:

del data

Load your files and use following generator:

train_X = np.load('train_X.npy')
train_y = np.load('train_y.npy')

test_X = np.load('test_X.npy')
test_y = np.load('test_y.npy')

def data_generator(X, y, batch_size, n_steps):
    idxs = np.arange(len(X))
    np.random.shuffle(idxs)
    ptr = 0

    for _ in range(n_steps):
        batch_idx = idxs[ptr:ptr+batch_size]
        x_sample = X[batch_idx]
        y_sample = y[batch_idx]
        ptr += batch_size
        if ptr > len(X):
            ptr = 0
        yield x_sapmple, y_sample

Prepare generators:

train_gen = data_generator(train_X, train_y, batch_size, train_steps)
test_gen = data_generator(test_X, test_y, batch_size, test_steps)

And fit the model finaly. Hope one of this solutions will help. At least if python does pass arrays and data frames buy reference, not by value. Stackoverflow answer about it.

Upvotes: 1

Related Questions