mackcmillion
mackcmillion

Reputation: 840

Tensorflow Race conditions when chaining multiple queues

I'd like to compute the mean of each of the RGB channels of a set of images in a multithreaded manner.

My idea was to have a string_input_producer that fills a filename_queue and then have a second FIFOQueue that is filled by num_threads threads that load images from the filenames in filename_queue, perform some ops on them and then enqueue the result.

This second queue is then accessed by one single thread (the main thread) that sums up all the values from the queue.

This is the code I have:

# variables for storing the mean and some intermediate results
mean = tf.Variable([0.0, 0.0, 0.0])
total = tf.Variable(0.0)

# the filename queue and the ops to read from it
filename_queue = tf.train.string_input_producer(filenames, num_epochs=1)
reader = tf.WholeFileReader()
_, value = reader.read(filename_queue)
image = tf.image.decode_jpeg(value, channels=3)
image = tf.cast(image, tf.float32)

sum = tf.reduce_sum(image, [0, 1])
num = tf.mul(tf.shape(image)[0], tf.shape(image)[1])
num = tf.cast(num, tf.float32)

# the second queue and its enqueue op
queue = tf.FIFOQueue(1000, dtypes=[tf.float32, tf.float32], shapes=[[3], []])
enqueue_op = queue.enqueue([sum, num])

# the ops performed by the main thread
img_sum, img_num = queue.dequeue()
mean_op = tf.add(mean, img_sum)
total_op = tf.add(total, img_num)

# adding new queue runner that performs enqueue_op on num_threads threads
qr = tf.train.QueueRunner(queue, [enqueue_op] * num_threads)
tf.train.add_queue_runner(qr)

init_op = tf.initialize_all_variables()

sess = tf.Session()
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

# the main loop being executed until the OutOfRangeError 
# (when filename_queue does not yield elements anymore)
try:
    while not coord.should_stop():
        mean, total = sess.run([mean_op, total_op])

except tf.errors.OutOfRangeError:
    print 'All images processed.'
finally:
    coord.request_stop()

coord.join(threads)

# some additional computations to get the mean
total_3channel = tf.pack([total, total, total])
mean = tf.div(mean, total_3channel)
mean = sess.run(mean)
print mean

The problem is each time I'm running this function I get different results, for example:

[ 99.35347748  58.35261154  44.56705856]
[ 95.91153717  92.54192352  87.48269653]
[ 124.991745    121.83417511  121.1891861 ]

I blame this to race conditions. But where do those race conditions come from? Can someone help me out?

Upvotes: 4

Views: 712

Answers (1)

Yaroslav Bulatov
Yaroslav Bulatov

Reputation: 57893

Your QueueRunner will start num_threads threads which will race to access your reader and push the result onto the queue. The order of images on queue will vary depending on which thread finishes first.

Update Feb 12

A simple example of chaining two queues, and summing up values from the second queue. When using num_threads > 1, there's some non-determinism in the intermediate values, but the final value will always be 30. When num_threads=1, everything is deterministic

tf.reset_default_graph()

queue_dtype = np.int32

# values_queue is a queue that will be filled with 0,1,2,3,4
# range_input_producer creates the queue and registers its queue_runner
value_queue = tf.range_input_producer(limit=5, num_epochs=1, shuffle=False)
value = value_queue.dequeue()

# value_squared_queue will be filled with 0,1,4,9,16
value_squared_queue = tf.FIFOQueue(capacity=50, dtypes=queue_dtype)
value_squared_enqueue = value_squared_queue.enqueue(tf.square(value))
value_squared = value_squared_queue.dequeue()

# value_squared_sum keeps running sum of squares of values 
value_squared_sum = tf.Variable(0)
value_squared_sum_update = value_squared_sum.assign_add(value_squared)

# register queue_runner in the global queue runners collection
num_threads = 2
qr = tf.train.QueueRunner(value_squared_queue, [value_squared_enqueue] * num_threads)
tf.train.queue_runner.add_queue_runner(qr)

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.start_queue_runners()

for i in range(5):
  sess.run([value_squared_sum_update])
  print sess.run([value_squared_sum])

You should see:

[0]
[1]
[5]
[14]
[30]

Or sometimes (when the order of first 2 values is flipped)

[1]
[1]
[5]
[14]
[30]

Upvotes: 2

Related Questions