watch-this
watch-this

Reputation: 1

How to get this OCR model to work with variable length examples?

This is a modified version of the OCR example in keras docs. First you'll need to download the input data which is a folder containing 1000 photos that have fixed length (5) captchas.

curl -LO https://github.com/AakashKumarNain/CaptchaCracker/raw/master/captcha_images_v2.zip
unzip -qq captcha_images_v2.zip 

This my version which needs adjustment

from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import (LSTM, Bidirectional, Conv2D, Dense,
                                     Dropout, Input, Layer, MaxPooling2D,
                                     Reshape)
from tensorflow.keras.optimizers import Adam
from tensorflow.python.keras.layers import StringLookup


class TrainManager:
    def __init__(self, src, image_width=200, image_height=50, batch_size=16):
        src = Path(src)
        self.image_width = image_width
        self.image_height = image_height
        self.batch_size = batch_size
        self.images = []
        self.labels = []
        for image in src.glob('*.png'):
            self.labels.append(image.stem)
            self.images.append(image.as_posix())
        self.max_label_length = len(max(self.labels, key=len))
        self.characters = sorted(set(''.join(self.labels)))
        self.char_to_num = StringLookup(vocabulary=self.characters, mask_token=None)
        self.num_to_char = StringLookup(
            vocabulary=self.char_to_num.get_vocabulary(), mask_token=None, invert=True
        )

    def encode_sample(self, img_path, label):
        img = tf.io.read_file(img_path)
        img = tf.io.decode_png(img, channels=1)
        img = tf.image.convert_image_dtype(img, tf.float32)
        img = tf.image.resize(img, [self.image_height, self.image_width])
        img = tf.transpose(img, perm=[1, 0, 2])
        label = self.char_to_num(
            tf.strings.unicode_split(label, input_encoding='UTF-8')
        )
        return {'image': img, 'label': label}

    def create_dataset(self, x, y, batch_size):
        dataset = tf.data.Dataset.from_tensor_slices((x, y))
        return (
            dataset.map(self.encode_sample, num_parallel_calls=tf.data.AUTOTUNE)
            .batch(batch_size)
            .prefetch(buffer_size=tf.data.AUTOTUNE)
        )

    def create_datasets(self, train_size=0.9, shuffle=True):
        images, labels = np.array(self.images), np.array(self.labels)
        size = len(images)
        indices = np.arange(size)
        if shuffle:
            np.random.shuffle(indices)
        train_samples = int(size * train_size)
        x_train, y_train = (
            images[indices[:train_samples]],
            labels[indices[:train_samples]],
        )
        x_valid, y_valid = (
            images[indices[train_samples:]],
            labels[indices[train_samples:]],
        )
        train_dataset = self.create_dataset(x_train, y_train, self.batch_size)
        valid_dataset = self.create_dataset(x_valid, y_valid, self.batch_size)
        return train_dataset, valid_dataset

    def display_dataset(self, dataset, n_rows=1, n_cols=1, fig_size=(10, 5)):
        _, ax = plt.subplots(n_rows, n_cols, figsize=fig_size)
        for batch in dataset.take(1):
            images = batch['image']
            labels = batch['label']
            for i in range(n_rows * n_cols):
                img = (images[i] * 255).numpy().astype('uint8')
                label = (
                    tf.strings.reduce_join(self.num_to_char(labels[i]))
                    .numpy()
                    .decode('utf-8')
                )
                row = i // n_rows
                col = i % n_cols
                ax[row, col].imshow(img[:, :, 0].T, cmap='gray')
                ax[row, col].set_title(label)
                ax[row, col].axis('off')

    def create_model(self, training=True):
        x0 = Input(
            shape=(self.image_width, self.image_height, 1),
            name='image',
            dtype='float32',
        )
        x = Conv2D(
            32,
            (3, 3),
            activation='relu',
            kernel_initializer='he_normal',
            padding='same',
            name='Conv1',
        )(x0)
        x = MaxPooling2D((2, 2), name='pool1')(x)
        x = Conv2D(
            64,
            (3, 3),
            activation='relu',
            kernel_initializer='he_normal',
            padding='same',
            name='Conv2',
        )(x)
        x = MaxPooling2D((2, 2), name='pool2')(x)
        new_shape = ((self.image_width // 4), (self.image_height // 4) * 64)
        x = Reshape(target_shape=new_shape, name='reshape')(x)
        x = Dense(64, activation='relu', name='dense1')(x)
        x = Dropout(0.2)(x)
        x = Bidirectional(LSTM(128, return_sequences=True, dropout=0.25))(x)
        x = Bidirectional(LSTM(64, return_sequences=True, dropout=0.25))(x)
        x = Dense(
            len(self.char_to_num.get_vocabulary()) + 1,
            activation='softmax',
            name='dense2',
        )(x)
        if not training:
            return Model(x0, x)
        labels = Input(name='label', shape=(None,), dtype='float32')
        output = CTCLayer(name='ctc_loss')(labels, x)
        model = Model(inputs=[x0, labels], outputs=output, name='ocr_model_v1')
        return model

    def decode_batch_predictions(self, pred):
        input_len = np.ones(pred.shape[0]) * pred.shape[1]
        results = tf.keras.backend.ctc_decode(
            pred, input_length=input_len, greedy=True
        )[0][0][:, : self.max_label_length]
        output_text = []
        for result in results:
            result = (
                tf.strings.reduce_join(self.num_to_char(result)).numpy().decode('utf-8')
            )
            output_text.append(result)
        return output_text


class CTCLayer(Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = tf.keras.backend.ctc_batch_cost

    def call(self, y_true, *args, **kwargs):
        y_pred = args[0]
        batch_length = tf.cast(tf.shape(y_true)[0], dtype='int64')
        input_length = tf.cast(tf.shape(y_pred)[1], dtype='int64')
        label_length = tf.cast(tf.shape(y_true)[1], dtype='int64')
        input_length = input_length * tf.ones(shape=(batch_length, 1), dtype='int64')
        label_length = label_length * tf.ones(shape=(batch_length, 1), dtype='int64')
        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)
        return y_pred


def main():
    w, h = 200, 50
    manager = TrainManager('captcha_images_v2', w, h)
    print('Number of images found: ', len(manager.images))
    print('Number of labels found: ', len(manager.labels))
    print('Number of unique characters: ', len(manager.characters))
    print('Characters present: ', manager.characters)
    optimizer = Adam()
    m = manager.create_model()
    m.compile(optimizer)
    m.summary()
    early_stopping = EarlyStopping(
        monitor='val_loss', patience=10, restore_best_weights=True
    )
    tr_dataset, val_dataset = manager.create_datasets()
    history = m.fit(
        tr_dataset,
        validation_data=val_dataset,
        epochs=100,
        callbacks=[early_stopping],
    )

It works perfectly fine for fixed lengths captchas and note the names of the files 2b827.png, 2bg48.png, 2cegf.png, ... are the labels contained in their respective images.

c1 c2 c3

If I modify 2b827.png to 2b827abcde.png, I'll get the following error:

2021-10-12 09:14:41.276269: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/100
18/59 [========>.....................] - ETA: 8s - loss: 29.0998Traceback (most recent call last):
  File "/Users/user/Desktop/ocr_example.py", line 216, in <module>
    main()
  File "/Users/user/Desktop/ocr_example.py", line 179, in main
    history = m.fit(
  File "/usr/local/lib/python3.9/site-packages/keras/engine/training.py", line 1184, in fit
    tmp_logs = self.train_function(iterator)
  File "/usr/local/lib/python3.9/site-packages/tensorflow/python/eager/def_function.py", line 885, in __call__
    result = self._call(*args, **kwds)
  File "/usr/local/lib/python3.9/site-packages/tensorflow/python/eager/def_function.py", line 917, in _call
    return self._stateless_fn(*args, **kwds)  # pylint: disable=not-callable
  File "/usr/local/lib/python3.9/site-packages/tensorflow/python/eager/function.py", line 3039, in __call__
    return graph_function._call_flat(
  File "/usr/local/lib/python3.9/site-packages/tensorflow/python/eager/function.py", line 1963, in _call_flat
    return self._build_call_outputs(self._inference_function.call(
  File "/usr/local/lib/python3.9/site-packages/tensorflow/python/eager/function.py", line 591, in call
    outputs = execute.execute(
  File "/usr/local/lib/python3.9/site-packages/tensorflow/python/eager/execute.py", line 59, in quick_execute
    tensors = pywrap_tfe.TFE_Py_Execute(ctx._handle, device_name, op_name,
tensorflow.python.framework.errors_impl.InvalidArgumentError:  Cannot add tensor to the batch: number of elements does not match. Shapes are: [tensor]: [10], [batch]: [5]
     [[node IteratorGetNext (defined at usr/local/lib/python3.9/site-packages/keras/engine/training.py:841) ]] [Op:__inference_train_function_11873]

Errors may have originated from an input operation.
Input Source operations connected to node IteratorGetNext:
 iterator (defined at usr/local/lib/python3.9/site-packages/keras/engine/training.py:1184)

Function call stack:
train_function

I need to modify it to accept and output variable lengths inputs/outputs. I think the inputs need to be padded respective to the longest label contained in the dataset.

Here's an example to illustrate what I think may work: given we have abc.png, abcde.png, and abcdefghij.png, inputs as well as possible outputs, they should have a form similar to:

But this approach will be limited to examples of length 10. I expect problems for longer than 10 labels. An ideal solution should accept any length and output any length as well. Here's an issue addressing the very same problem, which was resolved by padding the labels which I think has unforeseen shortcomings for the same reasons I mentioned.

Upvotes: 1

Views: 1259

Answers (1)

AloneTogether
AloneTogether

Reputation: 26708

So, let's assume you want to use different, longer labels than your current ones, which have a fix length of 5 as I can see when printing the max_length:

max_length = max([len(label) for label in labels])
# 5

You then need to adjust your functions def encode_sample(self, img_path, label) and create_dataset(self, x, y, batch_size) so that the labels are shorter than or equals to the maximum length, which can be arbitrary. Here, I am assuming a max_length=20 and that 0 is reserved as the padding character:

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
max_length = 20
for data in train_dataset:
  x,y = data
  data_dict = encode_single_sample(x, y)
  difference = max_length -  data_dict['label'].shape[0]
  if difference != 0:
    padding = np.zeros(difference)
    data_dict['label'] = np.concatenate((data_dict['label'], padding))

I think you get the idea. Note that you may also need to adjust your input shapes in your model. If you want to avoid padding this way, you only need to make sure that each individual batch, i.e. all labels in that batch, have the same shape. During inference you can have any length if your input shape remains shape=(None,).

Upvotes: 2

Related Questions