Mohammed AlQuraishi
Mohammed AlQuraishi

Reputation: 1557

Multiple queues causing TF to lock up

I'm trying to use multiple queues for reading and batching, but this is causing TF to occasionally lock up. Here's some sample code:

import tensorflow as tf

coordinator = tf.train.Coordinator()

file_queue = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader = tf.TextLineReader()
key, serialized_example = reader.read(file_queue)
keys, serialized_examples = tf.train.batch([key, serialized_example], 10)

# repeat the code snippet below multiple times, in my case 4
file_queue_i = tf.train.string_input_producer(tf.train.match_filenames_once(...))
reader_i = tf.TextLineReader()
key_i, serialized_example_i = reader.read(file_queue_i)

initializer = tf.initialize_all_variables()

session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1, intra_op_parallelism_threads=1))
session.run(initializer)

threads = tf.train.start_queue_runners(sess=session, coord=coordinator)

session.run(keys)

TensorFlow occasionally locks up at the last line, when I actually try to run something. This behavior is rather hard to reproduce using the above code however. In 1000+ runs, I could only get it to hang once. In my real code, the actual reader is more complicated, and it's using TFRecords, but otherwise everything is the same. There it hangs up 2/3 of the time with 3 queues in total. With 5 queues it seemingly never runs, and with 1 queue it seemingly never hangs. This is on a Mac with 0.6. I have a different system running Ubuntu, also with 0.6, and I get the same problem (although the frequency of locking up is much higher on the Ubuntu system).

UPDATE: A more accurate estimate of how often the above code locks up is 1 in 5,000 trials.

Upvotes: 2

Views: 2097

Answers (1)

Yaroslav Bulatov
Yaroslav Bulatov

Reputation: 57903

This is probably caused by not having enough operation threads. If you have a queue runner 1 depending on work of queue runner 2, and you run them asynchronously, then you'll need at least two op threads, set through inter_op_parallelism_threads, to guarantee that progress is being made.

In your case, you have queue runner that's filling batch thread depending on string_input_producer queue being not empty. If the queue runner associated with string_input_producer queue runs first, then everything is fine. But if batch queue runner is scheduled first, it will get stuck in string_input_producer.dequeue op waiting for string_input_producer queue to get some filenames. Since there's only 1 thread in TensorFlow op thread pool, the enqueue op of string_input_producer will never get allocated a thread to complete (ie, to execute its Compute method)

Simplest solution is to have at least as many operation threads as you have simultaneous run calls (ie, number of queues + 1). If you really want to restrict yourself to one thread, you could preload filename queue file filenames synchronously using main thread.

  coordinator = tf.train.Coordinator()

  import glob
  files = glob.glob('/temp/pipeline/*')
  if FLAGS.preload_filenames:
    file_queue = tf.FIFOQueue(capacity=len(files), dtypes=tf.string)
    enqueue_val = tf.placeholder(dtype=tf.string)
    enqueue_op = file_queue.enqueue(enqueue_val)
  else:
    file_queue = tf.train.string_input_producer(files)

  reader = tf.TextLineReader()
  key, serialized_example = reader.read(file_queue)
  keys, serialized_examples = tf.train.batch([key, serialized_example], 5,
                                             capacity=10)

  initializer = tf.initialize_all_variables()

  session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1,
                                             intra_op_parallelism_threads=1))
  print 'running initializer'
  session.run(initializer)

  if FLAGS.preload_filenames:
    print 'preloading filenames'
    for fn in files:
      session.run([enqueue_op], feed_dict={enqueue_val: fn})
      print 'size - ', session.run([file_queue.size()])
    session.run([file_queue.close()])

  print 'starting queue runners'
  threads = tf.train.start_queue_runners(sess=session, coord=coordinator)
  print 'about to run session'
  print session.run(keys)

Code above will need some encapsulation if you have more than one filenames queue. Alternatively here's a hacky work-around which should work if there's exactly prebuffer_amount filenames for all input_producer queues

queue_runners=tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
filename_queue_runners=[qr for qr in queue_runners if 'input_producer' in qr.name]
for qr in filename_queue_runners:
  for k in prebuffer_amount:
    sess.run(qr._enqueue_ops[0])

Upvotes: 2

Related Questions