Reputation: 1253
I am trying to use a custom py_func
enqueue_op
with TensorFlow
RandomShuffleQueue
and QueueRunner
. I am very new to TensorFlow
and is very confused. Here is what I have now:
def compute_data(symbol, time):
data = np.zeros((1330,))
return data
key_1 = [str(x) for x in range(3000)]
key_2 = [str(y) for y in range(4800)]
tf_k1 = tf.constant([k for k in k1])
tf_k2 = tf.constant([k for k in k2])
tf_k1_index = tf.random_uniform((1,), minval=0, maxval=len(k1), dtype=tf.int32, name='k1_index')
tf_k2_index = tf.random_uniform((1,), minval=0, maxval=len(k2), dtype=tf.int32, name='k2_index')
tf_k1_variable = tf.gather_nd(tf_symbols, tf_k1_index)
tf_k2_variable = tf.gather_nd(tf_times, tf_k2_index)
tf_compute_data = tf.py_func(compute_data, [tf_k1_variable, tf_k2_variable], tf.float32, name='py_func_compute_data')
Basically what I'm trying to achieve here is that given two sets of keys, randomly sample a combination of the two keys each time and generate a piece of data given those two keys. The data generation process involves a lot of file reading and is skipped for now because I want to build the graph properly first.
And here is the rest of the code that should enqueue the result of tf_compute_data
into queue
.
queue = tf.RandomShuffleQueue(
capacity=20000,
min_after_dequeue=2000,
dtypes=[tf.float32],
shapes=[[1330]],
name='data_queue'
)
enqueue_op = queue.enqueue(tf_compute_data)
tf_data = queue.dequeue_many(batch_size)
...
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
sv = tf.train.Supervisor(logdir="logdir")
with sv.managed_session(config=config, start_standard_services=True) as sess:
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
for step in xrange(1000000):
if coord.should_stop():
break
sess.run(train_op)
print step
coord.request_stop()
coord.join(enqueue_threads)
When I run the script, error shows as this:
W tensorflow/core/framework/op_kernel.cc:993] Out of range: RandomShuffleQueue '_0_data_queue' is closed and has insufficient elements (requested 64, current size 0)
[[Node: data_queue_DequeueMany = QueueDequeueManyV2[component_types=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](data_queue, data_queue_DequeueMany/n)]]
W tensorflow/core/framework/op_kernel.cc:993] Out of range: RandomShuffleQueue '_0_data_queue' is closed and has insufficient elements (requested 64, current size 0)
[[Node: data_queue_DequeueMany = QueueDequeueManyV2[component_types=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](data_queue, data_queue_DequeueMany/n)]]
When I add logging to the compute_data
function it shows that it only ran 4 times, once per thread. How do I make it run as long as coord.should_stop()
is False
?
Upvotes: 1
Views: 349
Reputation: 5808
Just to summarize the comments, there were two issues:
First, with tf.Graph().as_default()
starts things over from scratch, so everything needs to be re-defined in that new graph.
Second, the dtype
returned by a py_func
is a bit tricky, since numpy defaults to float64
while most TensorFlow functions default to float32
. So when defining a py_func
it may be necessary to explicitly set the dtype
of numpy arrays to float32
. There is an error message for this, but I think it got written to a different stream (so if you've arrived at this page for a similar queue error and a py_func
dtype
match isn't the issue, make sure you check both stdout
and stderr
for the underlying error).
Upvotes: 1