Kapil Mathur
Kapil Mathur

Reputation: 21

Sharing of array list or variable between 2 distributed tensorflow processes

I am presently working on Distributed tensorflow considering 2 worker processes and facing the issue of sharing variable between these two worker process. I found tf.get_collection/tf.add_collection but still unable to get the variable value shared between the 2 processes.

Adding Few details around how I want to share the data among the worker processes in Distributed Tensorflow :

def create_variable(layer_shape):
        with tf.variable_scope("share_lay"):
                layers = tf.get_variable("layers", shape=layer_shape, trainable=True)
        with tf.variable_scope("share_lay", reuse=tf.AUTO_REUSE):
                layers = tf.get_variable("layers", shape=layer_shape, trainable=True)
        return layers

def set_layer(layers):
        tf.add_to_collection("layers", layers)

def get_layer(name):
        return tf.get_collection(name)[0]


taskid == 0:
  layers = create_variable(layer_shape)
  layers = <some value>
  set_layer(layers)
taskid == 1:
  layers = create_variable(layer_shape)
  layers = get_layer("layers")

I am getting an error when performing get_layer() as :

return tf.get_collection(name)[0]

IndexError: list index out of range

It appears that the data cannot be share between the workers Request some suggestions regarding the same
Any suggestions / pointers is appreciated,

Thanks, Kapil

Upvotes: 2

Views: 928

Answers (1)

Luochao Wang
Luochao Wang

Reputation: 123

I finally solve the same problem by using tf.train.replica_device_setter() to place the variables on parameter server and add them to a colletion. Later, I can use tf.get_collection() in any worker to return that collection, which is actually a python list. Note that tf.get_collection only return a copy of original collection. If you want to change the variables in the original collection, you should use tf.get_collecion_ref which actually returns the collection list itself.

Here is an example:

import tensorflow as tf

FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string('job_name', '',
                           """One of 'ps', 'worker' """)
tf.app.flags.DEFINE_integer('task_index', 0,
                           """Index of task within the job""")

cluster = tf.train.ClusterSpec(
    {'ps': ['localhost:22222'],
    'worker': ['localhost:22223', 'localhost:22227']})
config = tf.ConfigProto(
            intra_op_parallelism_threads=1,
            inter_op_parallelism_threads=1)
if FLAGS.job_name == 'ps':
    server = tf.train.Server(cluster, job_name='ps', task_index=FLAGS.task_index, config=config)
    server.join()
else:
    server = tf.train.Server(cluster, job_name='worker', task_index=FLAGS.task_index, config=config)
    with tf.device(tf.train.replica_device_setter(cluster=cluster)):
        #create a colletion 'shared_list' and add two variables to the collection 'shared_list'
        #note that these two variables are placed on parameter server
        a = tf.Variable(name='a', initial_value=tf.constant(1.0), 
                        collections=[tf.GraphKeys.GLOBAL_VARIABLES, 'shared_list'])

        b = tf.Variable(name='b', initial_value=tf.constant(2.0), 
                        collections=[tf.GraphKeys.GLOBAL_VARIABLES, 'shared_list'])

    #now let's print out the value of a+2.0 and b+2.0 using the collection 'shared_list' from different worker
    #note that tf.get_collection will return a copy of exiting collection which is actually a python list
    with tf.device('/job:worker/task:%d' %FLAGS.task_index):
        c = tf.get_collection('shared_list')[0] + 2.0    # a+2.0
        d = tf.get_collection('shared_list')[1] + 2.0    # b+2.0


    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index==0),
                                           config=config) as sess:
        print('this is worker %d' % FLAGS.task_index)
        print(c.eval(session=sess))
        print(d.eval(session=sess))
        server.join()

worker 0 will print out:

this is worker 0
3.0
4.0

worker 1 will print out:

this is worker 1
3.0
4.0

Edit: work 0 modifies the variable 'a' to 10, and then worker 1 prints out the new value of 'a', which becomes 10 immediately. Actually, variable 'a' is available for both worker 0 and worker 1 because they are in distributed setting. Below is an example. Also refers to this blog in Amid Fish by Matthew Rahtz for how to share variables in distributed tensorflow. Actually, we don't need any parameter server to share variables. Any two workers can share the same variable with each other as long as the two workers create two variables having exactly the same name.

Here is the example

import tensorflow as tf
from time import sleep

FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string('job_name', '',
                           """One of 'ps', 'worker' """)
tf.app.flags.DEFINE_integer('task_index', 0,
                            """Index of task within the job""")

cluster = tf.train.ClusterSpec(
    {'ps': ['localhost:22222'],
     'worker': ['localhost:22223', 'localhost:22227']})

if FLAGS.job_name == 'ps':
    server = tf.train.Server(cluster, job_name='ps', task_index=FLAGS.task_index)
    server.join()
else:
    server = tf.train.Server(cluster, job_name='worker', task_index=FLAGS.task_index)
    with tf.device(tf.train.replica_device_setter(cluster=cluster)):
        # create a colletion 'shared_list' and add two variables to the collection 'shared_list'
        # note that these two variables are placed on parameter server
        a = tf.Variable(name='a', initial_value=tf.constant(1.0),
                        collections=[tf.GraphKeys.GLOBAL_VARIABLES, 'shared_list'])

        b = tf.Variable(name='b', initial_value=tf.constant(2.0),
                        collections=[tf.GraphKeys.GLOBAL_VARIABLES, 'shared_list'])

    # change the value of 'a' in worker 0
    if FLAGS.task_index == 0:
        change_a = a.assign(10)

    # print out the new value of a in worker 1 using get_collction. Note that we may need to
    # use read_value() method to force the op to read the current value of a
    if FLAGS.task_index == 1:
        with tf.device('/job:worker/task:1'):  # place read_a to worker 1
             read_a = tf.get_collection('shared_list')[0].read_value()  # a = 10

    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0))as sess:
        if FLAGS.task_index == 0:
            sess.run(change_a)

        if FLAGS.task_index == 1:
            sleep(1)  # sleep a little bit to wait until change_a has been executed
            print(read_a.eval(session=sess))
        server.join()

worker 1 prints out

10

Upvotes: 1

Related Questions