Ruofan Kong
Ruofan Kong

Reputation: 1070

In-graph replication for Distributed Tensorflow

I am learning Distributed Tensorflow, and I implemented a simple version code of In-graph replication as below (task_parallel.py):

import argparse
import logging

import tensorflow as tf


log = logging.getLogger(__name__)

# Job Names
PARAMETER_SERVER = "ps"
WORKER_SERVER = "worker"

# Cluster Details
CLUSTER_SPEC = {
    PARAMETER_SERVER: ["localhost:2222"],
    WORKER_SERVER: ["localhost:1111", "localhost:1112", "localhost:1113"]}


def parse_command_arguments():
    """ Set up and parse the command line arguments passed for experiment. """
    parser = argparse.ArgumentParser(
        description="Parameters and Arguments for the Test.")

    parser.add_argument(
        "--ps_hosts",
        type=str,
        default="",
        help="Comma-separated list of hostname:port pairs"
    )
    parser.add_argument(
        "--worker_hosts",
        type=str,
        default="",
        help="Comma-separated list of hostname:port pairs"
    )
    parser.add_argument(
        "--job_name",
        type=str,
        default="",
        help="One of 'ps', 'worker'"
    )
    # Flags for defining the tf.train.Server
    parser.add_argument(
        "--task_index",
        type=int,
        default=0,
        help="Index of task within the job"
    )

    return parser.parse_args()


def start_server(
        job_name, ps_hosts, task_index, worker_hosts):
    """ Create a server based on a cluster spec. """
    cluster_spec = {
        PARAMETER_SERVER: ps_hosts,
        WORKER_SERVER: worker_hosts}
    cluster = tf.train.ClusterSpec(cluster_spec)

    server = tf.train.Server(
        cluster, job_name=job_name, task_index=task_index)

    return server


def model():
    """ Build up a simple estimator model. """
    with tf.device("/job:%s/task:0" % PARAMETER_SERVER):
        log.info("111")
        # Build a linear model and predict values
        W = tf.Variable([.3], tf.float32)
        b = tf.Variable([-.3], tf.float32)
        x = tf.placeholder(tf.float32)
        linear_model = W * x + b
        y = tf.placeholder(tf.float32)
        global_step = tf.Variable(0)

    with tf.device("/job:%s/task:0" % WORKER_SERVER):
        # Loss sub-graph
        loss = tf.reduce_sum(tf.square(linear_model - y))
        log.info("222")
        # optimizer
        optimizer = tf.train.GradientDescentOptimizer(0.01)

    with tf.device("/job:%s/task:1" % WORKER_SERVER):
        log.info("333")
        train = optimizer.minimize(loss, global_step=global_step)

    return W, b, loss, x, y, train, global_step


def main():
    # Parse arguments from command line.
    arguments = parse_command_arguments()

    # Initializing logging with level "INFO".
    logging.basicConfig(level=logging.INFO)

    ps_hosts = arguments.ps_hosts.split(",")
    worker_hosts = arguments.worker_hosts.split(",")
    job_name = arguments.job_name
    task_index = arguments.task_index

    # Start a server.
    server = start_server(
        job_name, ps_hosts, task_index, worker_hosts)

    W, b, loss, x, y, train, global_step = model()
    # with sv.prepare_or_wait_for_session(server.target) as sess:
    with tf.train.MonitoredTrainingSession(
            master=server.target,
            is_chief=(arguments.task_index == 0 and (
                        arguments.job_name == 'ps')),
            config=tf.ConfigProto(log_device_placement=True)) as sess:
        step = 0
        # training data
        x_train = [1, 2, 3, 4]
        y_train = [0, -1, -2, -3]
        while not sess.should_stop() and step < 1000:
            _, step = sess.run(
                [train, global_step], {x: x_train, y: y_train})

        # evaluate training accuracy
        curr_W, curr_b, curr_loss = sess.run(
            [W, b, loss], {x: x_train, y: y_train})
        print("W: %s b: %s loss: %s" % (curr_W, curr_b, curr_loss))

if __name__ == "__main__":
    main()

I ran the code with 3 different processes in a single machine (MacPro with only CPU):

PS: $python task_parallel.py --task_index 0 --ps_hosts localhost:2222 --worker_hosts localhost:1111,localhost:1112 --job_name ps,

Worker 1: $python task_parallel.py --task_index 0 --ps_hosts localhost:2222 --worker_hosts localhost:1111,localhost:1112 --job_name worker

Worker 2: $python task_parallel.py --task_index 1 --ps_hosts localhost:2222 --worker_hosts localhost:1111,localhost:1112 --job_name worker

I noticed that the results were not what I expected. Specifically, I expect process "PS" only prints 111, "Worker 1" only prints 222 and "Worker 3" only prints 333 as I specified task for each process. However, what I got is all 3 processes printed the exactly same thing:

INFO:__main__:111
INFO:__main__:222
INFO:__main__:333

Isn't true that process PS only executed the code inside of block with tf.device("/job:%s/task:0" % PARAMETER_SERVER? And same for workers? I wonder if I missed something in my code.

I also found that I had to run all worker processes first and run ps process afterwards. Otherwise, the worker processes cannot be gracefully exited after training was done. So I want to know any reasons for this issue in my code. Really appreciate for helps :) Thanks!

Upvotes: 2

Views: 555

Answers (1)

Jinmian Ye
Jinmian Ye

Reputation: 56

Please note that, in your snippet, the codes before MonitoredTrainingSession are used to describe and build the running graph, both parameter servers and workers will execute these codes to generate the graph. The graph will be frozen when the MonitoredTrainingSession is being created.

If you want to see 111 only in PS, your code may work like this:

FLAGS = tf.app.flags.FLAGS
if FLAGS.job_name == 'ps':
    print('111')
    server.join()
else:
    print('222')

If you want to setup replicas model in workers, in model() function:

with tf.device('/job:ps/task:0'):
    # define variable in parameter
with tf.device('/job:worker/task:%d' % FLAGS.task_index):
    # define model in worker % task_index

Additionally, replica_device_setter will automatically assign devices to Operation objects as they are constructed.

There exists some examples provided by tensorflow, such as:

  1. hello distributed, a basic guide in tensorflow tutorial.

  2. mnist_replica.py, a distributed MNIST training and validation, with model replicas.

  3. cifar10_multi_gpu_train.py, a binary to train CIFAR-10 using multiple GPU's with synchronous updates.

Wish this will help you.

Upvotes: 2

Related Questions