Reputation: 23
I am currently trying to write a Tensorflow data input pipeline using tensorflow queues. My data consists of jpg images, three channels (RGB), and is 128x128 pixels.
My current issue is running my image_batch operation because the operation keeps halting and I'm not sure why.
Below is my code for building my input pipeline.
I have three main functions that I'm using:
read_my_file_format
takes in a filename_queue and attempts to load the file and resize ittensorflow_queue
takes a list of objects and generates a tensorflow FIFO queue. The queue is then added to a queuerunner and added to tf.train.add_queue_runner
shuffle_queue_batch
is meant to return an operation that fetches a batch of images and labels.
Below is my code.
def read_my_file_format(filename_queue):
reader = tf.WholeFileReader()
filename, image_string = reader.read(filename_queue)
image = tf.image.decode_jpeg(image_string, channels=3)
image = tf.image.resize_images(image, size=[256, 256])
return image
def tensorflow_queue(lst, dtype, capacity=32):
tensor = tf.convert_to_tensor(lst, dtype=dtype)
fq = tf.FIFOQueue(capacity=capacity, dtypes=dtype, shapes=(()))
fq_enqueue_op = fq.enqueue_many([tensor])
tf.train.add_queue_runner(tf.train.QueueRunner(fq, [fq_enqueue_op]*1))
return fq
def shuffle_queue_batch(image, label, batch_size, capacity=32, min_after_dequeue=10, threads=1):
tensor_list = [image, label]
dtypes = [tf.float32, tf.int32]
shapes = [image.get_shape(), label.get_shape()]
rand_shuff_queue = tf.RandomShuffleQueue(
capacity=capacity,
min_after_dequeue=min_after_dequeue,
dtypes=dtypes,
shapes=shapes
)
rand_shuff_enqueue_op = rand_shuff_queue.enqueue(tensor_list)
tf.train.add_queue_runner(tf.train.QueueRunner(rand_shuff_queue, [rand_shuff_enqueue_op] * threads))
image_batch, label_batch = rand_shuff_queue.dequeue_many(batch_size)
return image_batch, label_batch
def input_pipeline(filenames, classes, min_after_dequeue=10):
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
classes_queue = tensorflow_queue(classes, tf.int32)
image = read_my_file_format(filename_queue)
label = classes_queue.dequeue()
image_batch, label_batch = shuffle_queue_batch(image, label, BATCH_SIZE, min_after_dequeue=min_after_dequeue)
return image_batch, label_batch
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# get_image_data returns:
# filenames is a list of strings of the filenames
# classes is a list of ints
# datasize = number of images in dataset
filenames, classes, datasize = get_image_data()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
image_batch, label_batch = input_pipeline(filenames, classes)
print('Starting training')
for ep in range(NUM_EPOCHS):
total_loss = 0
for _ in range(datasize // BATCH_SIZE * BATCH_SIZE):
print('fetching batch')
x_batch = sess.run([image_batch])
print('x batch')
y_batch = sess.run([label_batch])
x_batch, y_batch = sess.run([image_batch, label_batch])
Thank you in advance.
Upvotes: 2
Views: 229
Reputation: 1878
Your code is mostly correct. Just a minor change will make the code work correctly. The reason your code is not working is because you are starting queue runners before you declare the queues. If you look at the return value of start_queue_runners
then you will see that the list is empty.
Having said that, Alexandre's advice is good. tf.Data
is the way to get high performant input pipeline. Also queuerunners are not compatible with the new TF Eager mechanism.
Relevant code follows:
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# get_image_data returns:
# filenames is a list of strings of the filenames
# classes is a list of ints
# datasize = number of images in dataset
filenames, classes, datasize = get_image_data()
image_batch, label_batch = input_pipeline(filenames, classes)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
#image_batch, label_batch = input_pipeline(filenames, classes)
print('Starting training')
for ep in range(NUM_EPOCHS):
total_loss = 0
for _ in range(datasize // BATCH_SIZE * BATCH_SIZE):
print('fetching batch')
x_batch = sess.run([image_batch])
print('x batch')
y_batch = sess.run([label_batch])
x_batch, y_batch = sess.run([image_batch, label_batch])
Upvotes: 1
Reputation: 5206
I strongly recommend you switch your input pipeline to tf.data from the tf.train queues. Queue input pipelines are inefficient and hard to maintain.
Upvotes: 0