bfra
bfra

Reputation: 321

error when implementing a tensorflow input pipeline with tf.data

I have an issue implementing an input pipeline with the new tf.data tensorflow class.

Specifically, when I include a convolution operation to the preprocessing - which I add to the pipeline with the map method - I get the following error

tensorflow.python.framework.errors_impl.UnimplementedError: Generic conv implementation only supports NHWC tensor format for now.
 [[{{node conv_debug}} = Conv2D[T=DT_FLOAT, data_format="NCHW", dilations=[1, 1, 1, 1], padding="SAME", strides=[1, 1, 1, 1], use_cudnn_on_gpu=true](conv_debug-0-TransposeNHWCToNCHW-LayoutOptimizer, ArithmeticOptimizer/FoldMultiplyIntoConv_scaled_conv_debug_Const)]]

When I exclude the convolution from the pipeline, everything works as expected.

I attach below the minimal code needed to reproduce the problem.

Tested with 3 configurations:

Am I doing it wrong or is it a CUDA/CUDnn related issue?

Thanks!

import numpy as np
import tensorflow as tf

image_height, image_width = 100, 200

def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def serialize_to_record(record_name, label, image):
        """Create a data record and store it"""
        writer = tf.python_io.TFRecordWriter(record_name)
        image_raw = image.tostring() 
        label_raw = label
        sample = tf.train.Example(features=tf.train.Features(feature={
            'image_raw': _bytes_feature(image_raw),
            'label_raw': _bytes_feature(label_raw)}))        
        writer.write(sample.SerializeToString())
        writer.close()
        return


def _dataset_parser(record):
    """Read and deserialize a tensorflow record"""
    parsed = tf.parse_single_example(record,
        features={'image_raw': tf.FixedLenFeature([], tf.string),
                  'label_raw': tf.FixedLenFeature([], tf.string)})
    image_ = tf.decode_raw(parsed['image_raw'], tf.uint8)
    image_.set_shape(image_height * image_width * 3)
    image_ = tf.reshape(image_, (image_height, image_width, 3))
    image = tf.cast(image_, tf.float32) / 255.0
    label = parsed['label_raw']

    return {'image': image, 'label': label}


def _dataset_preprocessor(datum):
    """dummy preprocessor consisting of a convolution with a random kernel"""
    image = datum['image']
    kernel = np.random.rand(5, 5, 3, 3)
    kernel_tf = tf.constant(kernel, dtype=tf.float32)
    image = tf.expand_dims(image, axis=0)
    image = tf.nn.conv2d(image, kernel_tf, [1, 1, 1, 1], padding='SAME', name='conv_debug')
    image = tf.squeeze(image, axis=0)
    datum['image'] = image
    return datum


def _dataset_operator(record):
    """define a sequence of operation to run on the dataset"""
    datum = _dataset_parser(record)
    datum = _dataset_preprocessor(datum)
    return datum


def _dataset_operator_noconv(record):
    """define a sequence of operation to run on the dataset"""
    datum = _dataset_parser(record)
    return datum


if __name__ == '__main__':

    # create a random tensor
    image = (255.0 * np.random.rand(image_height, image_width, 3)).astype(np.uint8)
    record_path = 'example.tfrecord'

    # store a tf record to disk
    serialize_to_record(record_path, label='example', image=image)

    # build a dummy dataset of copies of the generated image
    N = 32
    dataset_filenames = [record_path for n in range(N)]
    dataset = tf.data.TFRecordDataset(dataset_filenames)

    # add parser and preprocessor to the pipeline
    include_convolution_to_pipeline = True
    if include_convolution_to_pipeline:
        dataset = dataset.map(_dataset_operator)
    else:
        dataset = dataset.map(_dataset_operator_noconv)

    # complete pipeline for iteratively visiting the dataset in batches of 8 samples
    dataset = dataset.shuffle(buffer_size=100)
    dataset = dataset.batch(8)
    dataset = dataset.repeat()
    iterator = dataset.make_initializable_iterator()
    next_data = iterator.get_next()

    # init session and go for the first batch
    sess = tf.Session()
    sess.run(iterator.initializer)
    next_data_ = sess.run(next_data)

    print('***')

Upvotes: 7

Views: 2241

Answers (2)

AGP
AGP

Reputation: 459

It is tensorflow's layout optimizer problem.

Tensorflow "map" function executes the graph in CPU and placing tensors in the map otherwise confuses the layout optimizer.

Placing tf.device("/cpu:0") when creating the tensors inside the map function solves the layout optimizer confusion. Another option is to disable the layout optimizer which may cost in extra training time( it may not be feasible not to optimize the whole graph layout to execute "map" phase ).

There is already an open issue regarding this problem :

https://github.com/tensorflow/tensorflow/issues/26411

As this is a workaround, I think more robust solutions(executing "map" tensors in GPU, fixes for layout optimizer etc.) may come in the next releases of TF. But for now, suggested workaround solves my problem without hassling any layout deoptimization issues.

Upvotes: 3

Sharky
Sharky

Reputation: 4543

As error message states, convolution operation requires NCHW data format. Regardless of what data format you want, it still needs batch_size as one of dimensions. But you're trying to apply map function prior to batching. It's usually not standard order but if you need convolution, you need to apply map function after batch.

dataset = dataset.map(_dataset_operator)
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.batch(8)
dataset = dataset.map(_dataset_operator)
dataset = dataset.repeat()

Upvotes: 3

Related Questions