Reputation: 73
So I've been reading on how queuing and dequeuing works in tensorflow, and I've been playing around with making a filename queue and pulling from it.
Unfortunately I'm getting an error if I try dequeue immediately after starting the threads. Is there a reason for this? If I put in a 1 second sleep timer in, it will dequeue fine. If not, it sometimes works, but often will throw an exception (shown below code)
import tensorflow as tf
import time
with tf.Graph().as_default():
filename_list = ['data_batch_{}.mat'.format(i+1) for i in range(5)]
filename_queue = tf.train.string_input_producer(filename_list)
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
#time.sleep(1) # If I uncomment this it works
for i in range(5):
print(sess.run(filename_queue.dequeue()))
coord.request_stop()
coord.join(threads)
And the exception thrown:
---------------------------------------------------------------------------
NotFoundError Traceback (most recent call last)
<ipython-input-28-cf6ab7b71f22> in <module>()
10 #time.sleep(1)
11 for i in range(5):
---> 12 print(sess.run(filename_queue.dequeue()))
13
14 coord.request_stop()
/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in run(self, fetches, feed_dict, options, run_metadata)
331 try:
332 result = self._run(None, fetches, feed_dict, options_ptr,
--> 333 run_metadata_ptr)
334 if run_metadata:
335 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _run(self, handle, fetches, feed_dict, options, run_metadata)
571 try:
572 results = self._do_run(handle, target_list, unique_fetches,
--> 573 feed_dict_string, options, run_metadata)
574 finally:
575 # The movers are no longer used. Delete them.
/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)
646 if handle is None:
647 return self._do_call(_run_fn, self._session, feed_dict, fetch_list,
--> 648 target_list, options, run_metadata)
649 else:
650 return self._do_call(_prun_fn, self._session, handle, feed_dict,
/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.pyc in _do_call(self, fn, *args)
666 except KeyError:
667 pass
--> 668 raise type(e)(node_def, op, message)
669
670 def _extend_graph(self):
NotFoundError: FetchOutputs node input_producer_Dequeue:0: not found
Upvotes: 1
Views: 267
Reputation: 126184
Thanks for letting us know about this problem. I've filed a corresponding GitHub issue, and prepared a fix, which should appear in the repository soon.
In the meantime, the following code should work, by creating a single dequeue()
op before starting the session:
import tensorflow as tf
with tf.Graph().as_default():
filename_list = ['data_batch_{}.mat'.format(i+1) for i in range(5)]
filename_queue = tf.train.string_input_producer(filename_list)
dequeued_t = filename_queue.dequeue()
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
for i in range(5):
print(sess.run(dequeued_t))
coord.request_stop()
coord.join(threads)
Upvotes: 1