Reputation: 565
I am trying to train a CNN with images created during program execution. I have a game environment (not created by me) that generates screen images that depend on actions taken in the game. The actions are controlled by the learnt CNN.
These images are then pushed into a RandomShuffleQueue, from which mini batches are dequeued and used to train the CNN on the correct action. I would like to do this (game play and training) asynchronously, where the game is being played and screens of it are added to the RandomShuffleQueue in a separate thread that the one used to train the model. Here is a very simplified version of what I am trying.
import tensorflow as tf
from game_env import game
experience = tf.RandomShuffleQueue(10000,
1000, tf.float32,
shapes = [32,32],
name = 'experience_replay')
def perceive(game):
rawstate = game.grab_screen()
enq = experience.enqueue(rawstate)
return enq
#create threads to play the game and collect experience
available_threads = 4
coord = tf.train.Coordinator()
experience_runner = tf.train.QueueRunner(experience,
[perceive(game()) for num in range(available_threads)])
sess = tf.Session()
sess.run(tf.initialize_all_variables())
enqueue_threads = experience_runner.create_threads(sess, coord = coord, start = True)
with sess.as_default():
while(1):
print sess.run(experience.dequeue())
time.sleep(.5)
Meanwhile, the game_env looks like this:
import tensorflow as tf
class game(object):
def __init__(self):
self.screen_size = [32,32]
self.counter = 0
def grab_screen(self):
"""current screen of the game"""
self.counter += 1
screen = self.counter*tf.ones(self.screen_size)
return screen
As you can see, the game environment is really simple as of now: every time a screen grab is performed, a counter is incremented and an image filled with the counter (of the correct size) is returned.
It should be noted that I wrote the above class just for testing and in general grab_screen can return any numpy nd-array. Moreover it is not written by me so I can just call grab_screen and not make any changes inside the real thing.
Now, the problem is experience queue seems to be holding only tensors of ones (i.e. the counter only gets updated once!!)
Sample output:
I tensorflow/core/common_runtime/local_device.cc:40] Local device intra op parallelism threads: 4
I tensorflow/core/common_runtime/direct_session.cc:58] Direct session inter op parallelism threads: 4
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
[[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
...,
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]
[ 1. 1. 1. ..., 1. 1. 1.]]
and so on. My question is: how do I dynamically create the input image to be enqueued to the RandomShuffleQueue like this? Thanks!
Upvotes: 2
Views: 950
Reputation: 126184
The problem can be traced to this line, which defines the tf.train.QueueRunner
:
experience_runner = tf.train.QueueRunner(
experience, [perceive(game()) for num in range(available_threads)])
This creates four (available_threads
) ops that, each time any of them runs, will enqueue a tensor filled with 1.0 to the experience
queue.
Stepping through what happens in the list comprehension should make this clearer. The following happens four times:
game
object is constructed.perceive()
.perceive()
calls game.grab_screen()
once, which increments the counter, and returns a tensor 1 * tf.ones(self.screen_size)
percieve()
passes this tensor to experience.enqueue()
and returns the resulting op.The QueueRunner.create_threads()
call creates one thread per enqueue op, and these run in an infinite loop (blocking when the queue reaches capacity).
To have the desired effect, you should use the feed mechanism and a placeholder to pass a different value for the grabbed screen each time you enqueue an experience. It depends on how your game
class is implemented, but you probably also want to initialize a single instance of that class as well. Finally, it's not clear whether you want multiple enqueuing threads, but let's assume that game.grab_screen()
is thread-safe and permits some concurrency. Given all this, a plausible version looks like the following (note that you'll need to create a custom thread rather than a QueueRunner
to use feeding):
import tensorflow as tf
from game_env import game
experience = tf.RandomShuffleQueue(10000,
1000, tf.float32,
shapes=[32,32],
name='experience_replay')
screen_placeholder = tf.placeholder(tf.float32, [32, 32])
# You can create a single enqueue op and dequeued tensor, and reuse these from
# multiple threads.
enqueue_op = experience.enqueue(screen_placeholder)
dequeued_t = experience.dequeue()
# ...
init_op = tf.initialize_all_variables()
game_obj = game()
sess = tf.Session()
coord = tf.train.Coordinator()
# Define a custom thread for running the enqueue op that grabs a new
# screen in a loop and feeds it to the placeholder.
def enqueue_thread():
with coord.stop_on_exception():
while not coord.should_stop():
screen_val = game_obj.grab_screen()
# Run the same op, but feed a different value for the screen.
sess.run(enqueue_op, feed_dict={screen_placeholder: screen_val})
available_threads = 4
for _ in range(available_threads):
threading.Thread(target=enqueue_thread).start()
while True:
# N.B. It's more efficient to reuse the same dequeue op in a loop.
print sess.run(dequeued_t)
time.sleep(0.5)
Upvotes: 5