Da Tong
Da Tong

Reputation: 2026

How to use TensorFlow reader and queue to read two file at same time?

My training set contains two kinds of file: training image with file name like "1.png" and label file with name like "1.label.txt".

I found some usage of Queue and Reader in tutorials like this:

filename_queue = tf.train.string_input_producer(filenames)
result.key, value = reader.read(filename_queue)

However, because my training set contains two kinds of file, one correspond to one. How can I make use of Queue and Reader like code above?


EDIT

I am thinking about using one queue containing base names to feed to another two queue, which is image and label respectively. Code like this:

with tf.Session() as sess:
  base_name_queue = tf.train.string_input_producer(['image_names'], num_epochs=20)
  base_name = base_name_queue.dequeue()
  image_name = base_name + ".png"
  image_name_queue = data_flow_ops.FIFOQueue(32, image_name.dtype.base_dtype)
  image_name_queue.enqueue([image_name])
  x = image_name_queue.dequeue()
  print_op = tf.Print(image_name, [image_name])

  qr = tf.train.QueueRunner(base_name_queue, [base_name_queue] * 4)
  coord = tf.train.Coordinator()
  enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

  for step in range(1000000):
    if coord.should_stop():
      break
    print(sess.run(print_op))

  coord.request_stop()
  coord.join(enqueue_threads)

But running this code would result in an error:

TypeError: Fetch argument of has invalid type , must be a string or Tensor. (Can not convert a FIFOQueue into a Tensor or Operation.)

and the error point to this line:

coord.join(enqueue_threads)

I think I must misunderstand how TensorFlow queue works.

Upvotes: 3

Views: 3398

Answers (2)

Salvador Dali
Salvador Dali

Reputation: 222471

Your approach with two queues can have some negative consequences. Because one of your queues will hold image data (big) and another text data (tiny) there is a chance that one of the queues will be lagging behind another one.

Instead of it I would suggest you to take a look at tfrecord format. Then construct a tfrecord file which would consist from both your data and labels. And after that use only one queue to grab data and labels at the same time.

Upvotes: 0

Da Tong
Da Tong

Reputation: 2026

I have figured out the solution to my problem. I would like post answer here instead of delete my question, hoping this will help people who is new to TensorFlow.

The answer contains two parts:

Part 1: How to read files pair by pair using TensorFlow's queue

The solution is simple:

  1. Use 2 queue to store two set of files. Note that the two set should be ordered in the same way.
  2. Do some preprocessing respectively using dequeue.
  3. Combine two preprocessed tensor into one list and pass the list to shuffle_batch

Code here:

base_names = ['file1', 'file2']
base_tensor = tf.convert_to_tensor(base_names)
image_name_queue = tf.train.string_input_producer(
  tensor + '.png',
  shuffle=False # Note: must set shuffle to False
)
label_queue = tf.train.string_input_producer(
  tensor + '.lable.txt',
  shuffle=False # Note: must set shuffle to False
)

# use reader to read file
image_reader = tf.WholeFileReader()
image_key, image_raw = image_reader.read(image_name_queue)
image = tf.image.decode_png(image_raw)
label_reader = tf.WholeFileReader()
label_key, label_raw = label_reader.read(label_queue)
label = tf.image.decode_raw(label_raw)

# preprocess image
processed_image = tf.image.per_image_whitening(image)
batch = tf.train.shuffle_batch([processed_image, label], 10, 100, 100)

# print batch
queue_threads = queue_runner.start_queue_runners()
print(sess.run(batch))

Part 2: Queue, QueueRunner, Coordinator and helper functions

Queue is really a queue (seems meaningless). A queue has two method: enqueue and dequeue. The input of enqueue is Tensor (well, you can enqueue normal data, but it will be converted to Tensor internally). The return value of dequeue is a Tensor. So you can make pipeline of queues like this:

q1 = data_flow_ops.FIFOQueue(32, tf.int)
q2 = data_flow_ops.FIFOQueue(32, tf.int)
enq1 = q1.enqueue([1,2,3,4,5])
v1 = q1.dequeue()
enq2 = q2.enqueue(v1)

The benefit of using queue in TensorFlow is to asynchronously load data, which will improve performance and save memory. The code above is not runnable, because there is no thread running those operations. QueueRunner is designed to describe how to enqueue data in parallel. So the parameter of initializing QueueRunner is an enqueue operation (the output of enqueue).

After setting up all the QueueRunners, you have to start all the threads. One way is to start them when creating them:

enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

or, you can start all threads after all the setting up works done:

# add queue runner
queue_runner.add_queue_runner(queue_runner.QueueRunner(q, [enq]))

# start all queue runners
queue_threads = queue_runner.start_queue_runners()

When all the threads started, you have to decide when to exit. Coordinator is here to do this. Coordinator is like a shared flag between all the running threads. if one of them finished or run into error, it will call coord.request_stop(), then all the thread will get True when calling coord.should_stop(). So the pattern of using Coordinator is:

coord = tf.train.Coordinator()

for step in range(1000000):
  if coord.should_stop():
    break
  print(sess.run(print_op))

coord.request_stop()
coord.join(enqueue_threads)

Upvotes: 9

Related Questions