illiadherzog
illiadherzog

Reputation: 23

Tensorflow Input Pipeline for issues with generating batch

I am currently trying to write a Tensorflow data input pipeline using tensorflow queues. My data consists of jpg images, three channels (RGB), and is 128x128 pixels.

My current issue is running my image_batch operation because the operation keeps halting and I'm not sure why.

Below is my code for building my input pipeline.

I have three main functions that I'm using:

  1. read_my_file_format takes in a filename_queue and attempts to load the file and resize it
  2. tensorflow_queue takes a list of objects and generates a tensorflow FIFO queue. The queue is then added to a queuerunner and added to tf.train.add_queue_runner

  3. shuffle_queue_batch is meant to return an operation that fetches a batch of images and labels.

Below is my code.

def read_my_file_format(filename_queue):
   reader = tf.WholeFileReader()
   filename, image_string = reader.read(filename_queue)
   image = tf.image.decode_jpeg(image_string, channels=3)
   image = tf.image.resize_images(image, size=[256, 256])
   return image

def tensorflow_queue(lst, dtype, capacity=32):
    tensor = tf.convert_to_tensor(lst, dtype=dtype)
    fq = tf.FIFOQueue(capacity=capacity, dtypes=dtype, shapes=(()))
    fq_enqueue_op = fq.enqueue_many([tensor])
    tf.train.add_queue_runner(tf.train.QueueRunner(fq, [fq_enqueue_op]*1))
    return fq

def shuffle_queue_batch(image, label, batch_size, capacity=32, min_after_dequeue=10, threads=1):
    tensor_list = [image, label]
    dtypes = [tf.float32, tf.int32]
    shapes = [image.get_shape(), label.get_shape()]
    rand_shuff_queue = tf.RandomShuffleQueue(
                                capacity=capacity,
                                min_after_dequeue=min_after_dequeue,
                                dtypes=dtypes,
                                shapes=shapes
                                )
    rand_shuff_enqueue_op = rand_shuff_queue.enqueue(tensor_list)
    tf.train.add_queue_runner(tf.train.QueueRunner(rand_shuff_queue, [rand_shuff_enqueue_op] * threads))

    image_batch, label_batch = rand_shuff_queue.dequeue_many(batch_size)
    return image_batch, label_batch

def input_pipeline(filenames, classes, min_after_dequeue=10):
    filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
    classes_queue = tensorflow_queue(classes, tf.int32)
    image = read_my_file_format(filename_queue)
    label = classes_queue.dequeue()
    image_batch, label_batch = shuffle_queue_batch(image, label, BATCH_SIZE, min_after_dequeue=min_after_dequeue)

    return image_batch, label_batch


with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    # get_image_data returns:
    #    filenames is a list of strings of the filenames
    #    classes is a list of ints
    #    datasize = number of images in dataset
    filenames, classes, datasize = get_image_data()


    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    image_batch, label_batch = input_pipeline(filenames, classes)

    print('Starting training')
    for ep in range(NUM_EPOCHS):
        total_loss = 0
        for _ in range(datasize // BATCH_SIZE * BATCH_SIZE):
            print('fetching batch')
            x_batch = sess.run([image_batch])
            print('x batch')
            y_batch = sess.run([label_batch])
            x_batch, y_batch = sess.run([image_batch, label_batch])

Thank you in advance.

Upvotes: 2

Views: 229

Answers (2)

dgumo
dgumo

Reputation: 1878

Your code is mostly correct. Just a minor change will make the code work correctly. The reason your code is not working is because you are starting queue runners before you declare the queues. If you look at the return value of start_queue_runners then you will see that the list is empty.

Having said that, Alexandre's advice is good. tf.Data is the way to get high performant input pipeline. Also queuerunners are not compatible with the new TF Eager mechanism.

Relevant code follows:

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    # get_image_data returns:
    #    filenames is a list of strings of the filenames
    #    classes is a list of ints
    #    datasize = number of images in dataset
    filenames, classes, datasize = get_image_data()

    image_batch, label_batch = input_pipeline(filenames, classes)

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    #image_batch, label_batch = input_pipeline(filenames, classes)

    print('Starting training')
    for ep in range(NUM_EPOCHS):
        total_loss = 0
        for _ in range(datasize // BATCH_SIZE * BATCH_SIZE):
            print('fetching batch')
            x_batch = sess.run([image_batch])
            print('x batch')
            y_batch = sess.run([label_batch])
            x_batch, y_batch = sess.run([image_batch, label_batch])

Upvotes: 1

Alexandre Passos
Alexandre Passos

Reputation: 5206

I strongly recommend you switch your input pipeline to tf.data from the tf.train queues. Queue input pipelines are inefficient and hard to maintain.

Upvotes: 0

Related Questions