Reputation: 1557
I'm trying to use multiple queues for reading and batching, but this is causing TF to occasionally lock up. Here's some sample code:
import tensorflow as tf
coordinator = tf.train.Coordinator()
file_queue = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader = tf.TextLineReader()
key, serialized_example = reader.read(file_queue)
keys, serialized_examples = tf.train.batch([key, serialized_example], 10)
# repeat the code snippet below multiple times, in my case 4
file_queue_i = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader_i = tf.TextLineReader()
key_i, serialized_example_i = reader.read(file_queue_i)
initializer = tf.initialize_all_variables()
session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1, intra_op_parallelism_threads=1))
session.run(initializer)
threads = tf.train.start_queue_runners(sess=session, coord=coordinator)
session.run(keys)
TensorFlow occasionally locks up at the last line, when I actually try to run something. This behavior is rather hard to reproduce using the above code however. In 1000+ runs, I could only get it to hang once. In my real code, the actual reader is more complicated, and it's using TFRecords, but otherwise everything is the same. There it hangs up 2/3 of the time with 3 queues in total. With 5 queues it seemingly never runs, and with 1 queue it seemingly never hangs. This is on a Mac with 0.6. I have a different system running Ubuntu, also with 0.6, and I get the same problem (although the frequency of locking up is much higher on the Ubuntu system).
UPDATE: A more accurate estimate of how often the above code locks up is 1 in 5,000 trials.
Upvotes: 2
Views: 2097
Reputation: 57903
This is probably caused by not having enough operation threads. If you have a queue runner 1 depending on work of queue runner 2, and you run them asynchronously, then you'll need at least two op threads, set through inter_op_parallelism_threads
, to guarantee that progress is being made.
In your case, you have queue runner that's filling batch
thread depending on string_input_producer
queue being not empty. If the queue runner associated with string_input_producer
queue runs first, then everything is fine. But if batch
queue runner is scheduled first, it will get stuck in string_input_producer.dequeue
op waiting for string_input_producer
queue to get some filenames. Since there's only 1 thread in TensorFlow op thread pool, the enqueue
op of string_input_producer
will never get allocated a thread to complete (ie, to execute its Compute
method)
Simplest solution is to have at least as many operation threads as you have simultaneous run
calls (ie, number of queues + 1). If you really want to restrict yourself to one thread, you could preload filename queue file filenames synchronously using main thread.
coordinator = tf.train.Coordinator()
import glob
files = glob.glob('/temp/pipeline/*')
if FLAGS.preload_filenames:
file_queue = tf.FIFOQueue(capacity=len(files), dtypes=tf.string)
enqueue_val = tf.placeholder(dtype=tf.string)
enqueue_op = file_queue.enqueue(enqueue_val)
else:
file_queue = tf.train.string_input_producer(files)
reader = tf.TextLineReader()
key, serialized_example = reader.read(file_queue)
keys, serialized_examples = tf.train.batch([key, serialized_example], 5,
capacity=10)
initializer = tf.initialize_all_variables()
session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1,
intra_op_parallelism_threads=1))
print 'running initializer'
session.run(initializer)
if FLAGS.preload_filenames:
print 'preloading filenames'
for fn in files:
session.run([enqueue_op], feed_dict={enqueue_val: fn})
print 'size - ', session.run([file_queue.size()])
session.run([file_queue.close()])
print 'starting queue runners'
threads = tf.train.start_queue_runners(sess=session, coord=coordinator)
print 'about to run session'
print session.run(keys)
Code above will need some encapsulation if you have more than one filenames queue. Alternatively here's a hacky work-around which should work if there's exactly prebuffer_amount
filenames for all input_producer queues
queue_runners=tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
filename_queue_runners=[qr for qr in queue_runners if 'input_producer' in qr.name]
for qr in filename_queue_runners:
for k in prebuffer_amount:
sess.run(qr._enqueue_ops[0])
Upvotes: 2