MiniQuark
MiniQuark

Reputation: 48446

How can I push tensors to a TensorFlow queue and pull them from another process?

I have a TensorFlow cluster up and running, and I'm trying to enqueue data using one client process and dequeue it from another process. I can't get this to work, what am I doing wrong?

Here's my program to push data:

# queue_push.py
import tensorflow as tf
import time

with tf.container("qtest"):
    q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32],
                     shapes=[[]], name="q")
    v = tf.placeholder(tf.float32, shape=())
    enqueue = q.enqueue([v])

with tf.Session("grpc://localhost:2210") as sess:
    while True:
        t = time.time()
        print(t)
        sess.run(enqueue, feed_dict={v: t})
        time.sleep(1)

And my program to pull data:

# queue_pull.py
import tensorflow as tf
import time

with tf.container("qtest"):
    q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32],
                     shapes=[[]], name="q")
    dequeue = q.dequeue()

with tf.Session("grpc://localhost:2222") as sess:
    while True:
        v = sess.run(dequeue)
        print("Pulled:", v)
        time.sleep(0.5)

When I run them, here's what I get:

$ python queue_push.py
1472420887.974484
1472420888.991067
1472420889.995756
1472420890.998365
1472420892.001799
1472420893.008567
1472420894.011109
1472420895.014532
1472420896.02017
1472420897.024806
1472420898.03187
(then blocked forever)

And in parallel:

$ python queue_pull.py
(blocked forever)

Upvotes: 4

Views: 1295

Answers (1)

mrry
mrry

Reputation: 126154

To share a tf.FIFOQueue (or other TensorFlow queue) between multiple sessions, you need to pass the optional shared_name argument to the constructor, and set it to the same string in each instance. For example, you could create q as follows in the two scripts:

q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[[]],
                 shared_name="shared_q", name="q")

The shared_name does not have to be the same as the name for the queue, but it does need to be unique among all other queues created in the same container on the same device. You can share queues without using a with tf.container(): block; the tf.container() block provides a way of grouping stateful objects so that they can be cleared selectively (using tf.Session.reset()).


N.B. The default sharing behavior is different for tf.Variable objects, which are shared by default based on their name property. Otherwise, all shared objects (queues, readers, etc.) in TensorFlow graphs are only shared when you set the shared_name argument in their constructors.

Upvotes: 5

Related Questions