Reputation: 1070
I am learning Distributed Tensorflow, and I implemented a simple version code of In-graph replication
as below (task_parallel.py
):
import argparse
import logging
import tensorflow as tf
log = logging.getLogger(__name__)
# Job Names
PARAMETER_SERVER = "ps"
WORKER_SERVER = "worker"
# Cluster Details
CLUSTER_SPEC = {
PARAMETER_SERVER: ["localhost:2222"],
WORKER_SERVER: ["localhost:1111", "localhost:1112", "localhost:1113"]}
def parse_command_arguments():
""" Set up and parse the command line arguments passed for experiment. """
parser = argparse.ArgumentParser(
description="Parameters and Arguments for the Test.")
parser.add_argument(
"--ps_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--worker_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
return parser.parse_args()
def start_server(
job_name, ps_hosts, task_index, worker_hosts):
""" Create a server based on a cluster spec. """
cluster_spec = {
PARAMETER_SERVER: ps_hosts,
WORKER_SERVER: worker_hosts}
cluster = tf.train.ClusterSpec(cluster_spec)
server = tf.train.Server(
cluster, job_name=job_name, task_index=task_index)
return server
def model():
""" Build up a simple estimator model. """
with tf.device("/job:%s/task:0" % PARAMETER_SERVER):
log.info("111")
# Build a linear model and predict values
W = tf.Variable([.3], tf.float32)
b = tf.Variable([-.3], tf.float32)
x = tf.placeholder(tf.float32)
linear_model = W * x + b
y = tf.placeholder(tf.float32)
global_step = tf.Variable(0)
with tf.device("/job:%s/task:0" % WORKER_SERVER):
# Loss sub-graph
loss = tf.reduce_sum(tf.square(linear_model - y))
log.info("222")
# optimizer
optimizer = tf.train.GradientDescentOptimizer(0.01)
with tf.device("/job:%s/task:1" % WORKER_SERVER):
log.info("333")
train = optimizer.minimize(loss, global_step=global_step)
return W, b, loss, x, y, train, global_step
def main():
# Parse arguments from command line.
arguments = parse_command_arguments()
# Initializing logging with level "INFO".
logging.basicConfig(level=logging.INFO)
ps_hosts = arguments.ps_hosts.split(",")
worker_hosts = arguments.worker_hosts.split(",")
job_name = arguments.job_name
task_index = arguments.task_index
# Start a server.
server = start_server(
job_name, ps_hosts, task_index, worker_hosts)
W, b, loss, x, y, train, global_step = model()
# with sv.prepare_or_wait_for_session(server.target) as sess:
with tf.train.MonitoredTrainingSession(
master=server.target,
is_chief=(arguments.task_index == 0 and (
arguments.job_name == 'ps')),
config=tf.ConfigProto(log_device_placement=True)) as sess:
step = 0
# training data
x_train = [1, 2, 3, 4]
y_train = [0, -1, -2, -3]
while not sess.should_stop() and step < 1000:
_, step = sess.run(
[train, global_step], {x: x_train, y: y_train})
# evaluate training accuracy
curr_W, curr_b, curr_loss = sess.run(
[W, b, loss], {x: x_train, y: y_train})
print("W: %s b: %s loss: %s" % (curr_W, curr_b, curr_loss))
if __name__ == "__main__":
main()
I ran the code with 3 different processes in a single machine (MacPro with only CPU):
PS: $python task_parallel.py --task_index 0 --ps_hosts localhost:2222 --worker_hosts localhost:1111,localhost:1112 --job_name ps
,
Worker 1: $python task_parallel.py --task_index 0 --ps_hosts localhost:2222 --worker_hosts localhost:1111,localhost:1112 --job_name worker
Worker 2: $python task_parallel.py --task_index 1 --ps_hosts localhost:2222 --worker_hosts localhost:1111,localhost:1112 --job_name worker
I noticed that the results were not what I expected. Specifically, I expect process "PS" only prints 111
, "Worker 1" only prints 222
and "Worker 3" only prints 333
as I specified task for each process. However, what I got is all 3 processes printed the exactly same thing:
INFO:__main__:111
INFO:__main__:222
INFO:__main__:333
Isn't true that process PS
only executed the code inside of block with tf.device("/job:%s/task:0" % PARAMETER_SERVER
? And same for workers? I wonder if I missed something in my code.
I also found that I had to run all worker processes first and run ps process afterwards. Otherwise, the worker processes cannot be gracefully exited after training was done. So I want to know any reasons for this issue in my code. Really appreciate for helps :) Thanks!
Upvotes: 2
Views: 555
Reputation: 56
Please note that, in your snippet, the codes before MonitoredTrainingSession are used to describe and build the running graph, both parameter servers and workers will execute these codes to generate the graph. The graph will be frozen when the MonitoredTrainingSession is being created.
If you want to see 111
only in PS
, your code may work like this:
FLAGS = tf.app.flags.FLAGS
if FLAGS.job_name == 'ps':
print('111')
server.join()
else:
print('222')
If you want to setup replicas model in workers, in model()
function:
with tf.device('/job:ps/task:0'):
# define variable in parameter
with tf.device('/job:worker/task:%d' % FLAGS.task_index):
# define model in worker % task_index
Additionally, replica_device_setter
will automatically assign devices to Operation
objects as they are constructed.
There exists some examples provided by tensorflow, such as:
hello distributed, a basic guide in tensorflow tutorial.
mnist_replica.py, a distributed MNIST training and validation, with model replicas.
Wish this will help you.
Upvotes: 2