Fergal
Fergal

Reputation: 73

Dequeue immediately after starting threads fails

So I've been reading on how queuing and dequeuing works in tensorflow, and I've been playing around with making a filename queue and pulling from it.

Unfortunately I'm getting an error if I try dequeue immediately after starting the threads. Is there a reason for this? If I put in a 1 second sleep timer in, it will dequeue fine. If not, it sometimes works, but often will throw an exception (shown below code)

import tensorflow as tf
import time
with tf.Graph().as_default():
    filename_list = ['data_batch_{}.mat'.format(i+1) for i in range(5)]
    filename_queue = tf.train.string_input_producer(filename_list)

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        #time.sleep(1) # If I uncomment this it works
        for i in range(5):
            print(sess.run(filename_queue.dequeue()))

        coord.request_stop()
        coord.join(threads)

And the exception thrown:

---------------------------------------------------------------------------
NotFoundError                             Traceback (most recent call last)
<ipython-input-28-cf6ab7b71f22> in <module>()
     10         #time.sleep(1)
     11         for i in range(5):
---> 12             print(sess.run(filename_queue.dequeue()))
     13 
     14         coord.request_stop()

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in run(self, fetches, feed_dict, options, run_metadata)
    331     try:
    332       result = self._run(None, fetches, feed_dict, options_ptr,
--> 333                          run_metadata_ptr)
    334       if run_metadata:
    335         proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _run(self, handle, fetches, feed_dict, options, run_metadata)
    571     try:
    572       results = self._do_run(handle, target_list, unique_fetches,
--> 573                              feed_dict_string, options, run_metadata)
    574     finally:
    575       # The movers are no longer used. Delete them.

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)
    646     if handle is None:
    647       return self._do_call(_run_fn, self._session, feed_dict, fetch_list,
--> 648                            target_list, options, run_metadata)
    649     else:
    650       return self._do_call(_prun_fn, self._session, handle, feed_dict,

/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _do_call(self, fn, *args)
    666         except KeyError:
    667           pass
--> 668       raise type(e)(node_def, op, message)
    669 
    670   def _extend_graph(self):

NotFoundError: FetchOutputs node input_producer_Dequeue:0: not found

Upvotes: 1

Views: 267

Answers (1)

mrry
mrry

Reputation: 126184

Thanks for letting us know about this problem. I've filed a corresponding GitHub issue, and prepared a fix, which should appear in the repository soon.

In the meantime, the following code should work, by creating a single dequeue() op before starting the session:

import tensorflow as tf
with tf.Graph().as_default():
    filename_list = ['data_batch_{}.mat'.format(i+1) for i in range(5)]
    filename_queue = tf.train.string_input_producer(filename_list)
    dequeued_t = filename_queue.dequeue()

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        for i in range(5):
            print(sess.run(dequeued_t))

        coord.request_stop()
        coord.join(threads)

Upvotes: 1

Related Questions