Tengyu Liu
Tengyu Liu

Reputation: 1253

Tensorflow QueueRunner with py_func enqueue_op: How to use?

I am trying to use a custom py_func enqueue_op with TensorFlow RandomShuffleQueue and QueueRunner. I am very new to TensorFlow and is very confused. Here is what I have now:

def compute_data(symbol, time):
    data = np.zeros((1330,))
    return data

key_1 = [str(x) for x in range(3000)]
key_2 = [str(y) for y in range(4800)]
tf_k1 = tf.constant([k for k in k1])
tf_k2 = tf.constant([k for k in k2])
tf_k1_index = tf.random_uniform((1,), minval=0, maxval=len(k1), dtype=tf.int32, name='k1_index')
tf_k2_index = tf.random_uniform((1,), minval=0, maxval=len(k2), dtype=tf.int32, name='k2_index')
tf_k1_variable = tf.gather_nd(tf_symbols, tf_k1_index)
tf_k2_variable = tf.gather_nd(tf_times, tf_k2_index)
tf_compute_data = tf.py_func(compute_data, [tf_k1_variable, tf_k2_variable], tf.float32, name='py_func_compute_data')

Basically what I'm trying to achieve here is that given two sets of keys, randomly sample a combination of the two keys each time and generate a piece of data given those two keys. The data generation process involves a lot of file reading and is skipped for now because I want to build the graph properly first.

And here is the rest of the code that should enqueue the result of tf_compute_data into queue.

queue = tf.RandomShuffleQueue(
    capacity=20000, 
    min_after_dequeue=2000, 
    dtypes=[tf.float32], 
    shapes=[[1330]], 
    name='data_queue'
    )

enqueue_op = queue.enqueue(tf_compute_data)
tf_data = queue.dequeue_many(batch_size)

...

qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
sv = tf.train.Supervisor(logdir="logdir")
with sv.managed_session(config=config, start_standard_services=True) as sess:
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

    for step in xrange(1000000):
        if coord.should_stop():
            break
        sess.run(train_op)
        print step

    coord.request_stop()
    coord.join(enqueue_threads)

When I run the script, error shows as this:

W tensorflow/core/framework/op_kernel.cc:993] Out of range: RandomShuffleQueue '_0_data_queue' is closed and has insufficient elements (requested 64, current size 0)
     [[Node: data_queue_DequeueMany = QueueDequeueManyV2[component_types=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](data_queue, data_queue_DequeueMany/n)]]
W tensorflow/core/framework/op_kernel.cc:993] Out of range: RandomShuffleQueue '_0_data_queue' is closed and has insufficient elements (requested 64, current size 0)
     [[Node: data_queue_DequeueMany = QueueDequeueManyV2[component_types=[DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](data_queue, data_queue_DequeueMany/n)]]

When I add logging to the compute_data function it shows that it only ran 4 times, once per thread. How do I make it run as long as coord.should_stop() is False?

Upvotes: 1

Views: 349

Answers (1)

Allen Lavoie
Allen Lavoie

Reputation: 5808

Just to summarize the comments, there were two issues:

First, with tf.Graph().as_default() starts things over from scratch, so everything needs to be re-defined in that new graph.

Second, the dtype returned by a py_func is a bit tricky, since numpy defaults to float64 while most TensorFlow functions default to float32. So when defining a py_func it may be necessary to explicitly set the dtype of numpy arrays to float32. There is an error message for this, but I think it got written to a different stream (so if you've arrived at this page for a similar queue error and a py_func dtype match isn't the issue, make sure you check both stdout and stderr for the underlying error).

Upvotes: 1

Related Questions