Reputation: 840
I'd like to compute the mean of each of the RGB channels of a set of images in a multithreaded manner.
My idea was to have a string_input_producer
that fills a filename_queue
and then have a second FIFOQueue
that is filled by num_threads
threads that load images from the filenames in filename_queue
, perform some ops on them and then enqueue the result.
This second queue is then accessed by one single thread (the main thread) that sums up all the values from the queue.
This is the code I have:
# variables for storing the mean and some intermediate results
mean = tf.Variable([0.0, 0.0, 0.0])
total = tf.Variable(0.0)
# the filename queue and the ops to read from it
filename_queue = tf.train.string_input_producer(filenames, num_epochs=1)
reader = tf.WholeFileReader()
_, value = reader.read(filename_queue)
image = tf.image.decode_jpeg(value, channels=3)
image = tf.cast(image, tf.float32)
sum = tf.reduce_sum(image, [0, 1])
num = tf.mul(tf.shape(image)[0], tf.shape(image)[1])
num = tf.cast(num, tf.float32)
# the second queue and its enqueue op
queue = tf.FIFOQueue(1000, dtypes=[tf.float32, tf.float32], shapes=[[3], []])
enqueue_op = queue.enqueue([sum, num])
# the ops performed by the main thread
img_sum, img_num = queue.dequeue()
mean_op = tf.add(mean, img_sum)
total_op = tf.add(total, img_num)
# adding new queue runner that performs enqueue_op on num_threads threads
qr = tf.train.QueueRunner(queue, [enqueue_op] * num_threads)
tf.train.add_queue_runner(qr)
init_op = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# the main loop being executed until the OutOfRangeError
# (when filename_queue does not yield elements anymore)
try:
while not coord.should_stop():
mean, total = sess.run([mean_op, total_op])
except tf.errors.OutOfRangeError:
print 'All images processed.'
finally:
coord.request_stop()
coord.join(threads)
# some additional computations to get the mean
total_3channel = tf.pack([total, total, total])
mean = tf.div(mean, total_3channel)
mean = sess.run(mean)
print mean
The problem is each time I'm running this function I get different results, for example:
[ 99.35347748 58.35261154 44.56705856]
[ 95.91153717 92.54192352 87.48269653]
[ 124.991745 121.83417511 121.1891861 ]
I blame this to race conditions. But where do those race conditions come from? Can someone help me out?
Upvotes: 4
Views: 712
Reputation: 57893
Your QueueRunner
will start num_threads
threads which will race to access your reader
and push the result onto the queue. The order of images on queue
will vary depending on which thread finishes first.
Update Feb 12
A simple example of chaining two queues, and summing up values from the second queue. When using num_threads
> 1, there's some non-determinism in the intermediate values, but the final value will always be 30
. When num_threads=1
, everything is deterministic
tf.reset_default_graph()
queue_dtype = np.int32
# values_queue is a queue that will be filled with 0,1,2,3,4
# range_input_producer creates the queue and registers its queue_runner
value_queue = tf.range_input_producer(limit=5, num_epochs=1, shuffle=False)
value = value_queue.dequeue()
# value_squared_queue will be filled with 0,1,4,9,16
value_squared_queue = tf.FIFOQueue(capacity=50, dtypes=queue_dtype)
value_squared_enqueue = value_squared_queue.enqueue(tf.square(value))
value_squared = value_squared_queue.dequeue()
# value_squared_sum keeps running sum of squares of values
value_squared_sum = tf.Variable(0)
value_squared_sum_update = value_squared_sum.assign_add(value_squared)
# register queue_runner in the global queue runners collection
num_threads = 2
qr = tf.train.QueueRunner(value_squared_queue, [value_squared_enqueue] * num_threads)
tf.train.queue_runner.add_queue_runner(qr)
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.start_queue_runners()
for i in range(5):
sess.run([value_squared_sum_update])
print sess.run([value_squared_sum])
You should see:
[0]
[1]
[5]
[14]
[30]
Or sometimes (when the order of first 2 values is flipped)
[1]
[1]
[5]
[14]
[30]
Upvotes: 2