Reputation: 21
I changed the PTB model (you can found it in tensorflow/models/tutorials/rnn/ptb) to a between-graph version, but this distributed version ( with 1 ps server, 2 worker ) has no speedup effect even if the ps and workers are in single machine. the timeline profiling shows significant delay between GPU jobs and CPU jobs for distributed version. below are codes and timeline graphs:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import numpy as np
import tensorflow as tf
import reader
import tempfile
flags = tf.flags
logging = tf.logging
flags.DEFINE_string(
"model", "small",
"A type of model. Possible options are: small, medium, large.")
flags.DEFINE_string("data_path", None,
"Where the training/test data is stored.")
flags.DEFINE_string("save_path", None,
"Model output directory.")
flags.DEFINE_bool("use_fp16", False,
"Train using 16-bit floats instead of 32bit floats")
flags.DEFINE_string("ps_hosts","IP1:2222",
"Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "IP1:2223,IP1:2224",
"Comma-separated list of hostname:port pairs")
flags.DEFINE_string("job_name", None,"job name: worker or ps")
flags.DEFINE_integer("task_index", None,
"Worker task index, should be >= 0. task_index=0 is "
"the master worker task the performs the variable "
"initialization ")
flags.DEFINE_integer("num_gpus", 1,
"Total number of gpus for each machine."
"If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
"Number of replicas to aggregate before parameter update"
"is applied (For sync_replicas mode only; default: "
"num_workers)")
flags.DEFINE_boolean("sync_replicas", False,
"Use the sync_replicas (synchronized replicas) mode, "
"wherein the parameter updates from workers are aggregated "
"before applied to avoid stale gradients")
flags.DEFINE_boolean(
"existing_servers", False, "Whether servers already exists. If True, "
"will use the worker hosts via their GRPC URLs (one client process "
"per worker host). Otherwise, will create an in-process TensorFlow "
"server.")
FLAGS = flags.FLAGS
def data_type():
return tf.float16 if FLAGS.use_fp16 else tf.float32
class PTBInput(object):
"""The input data."""
def __init__(self, config, data, ix, worker_num, name=None):
data_len = len(data) // worker_num
data = data[data_len * ix:data_len * (ix + 1)]
self.batch_size = batch_size = config.batch_size
self.num_steps = num_steps = config.num_steps
self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
self.input_data, self.targets = reader.ptb_producer(
data, batch_size, num_steps, name=name)
class PTBModel(object):
"""The PTB model."""
def __init__(self, is_training, config, input_, num_workers=0, global_step=None):
self._input = input_
batch_size = input_.batch_size
num_steps = input_.num_steps
size = config.hidden_size
vocab_size = config.vocab_size
# Slightly better results can be obtained with forget gate biases
# initialized to 1 but the hyperparameters of the model would need to be
# different than reported in the paper.
def lstm_cell():
# return tf.contrib.rnn.BasicLSTMCell(
return tf.nn.rnn_cell.BasicLSTMCell(
size, forget_bias=0.0, state_is_tuple=True)
attn_cell = lstm_cell
if is_training and config.keep_prob < 1:
def attn_cell():
return tf.contrib.rnn.DropoutWrapper(
lstm_cell(), output_keep_prob=config.keep_prob)
# cell = tf.contrib.rnn.MultiRNNCell(
cell = tf.nn.rnn_cell.MultiRNNCell(
[attn_cell() for _ in range(config.num_layers)], state_is_tuple=True)
self._initial_state = cell.zero_state(batch_size, data_type())
with tf.device("/cpu:0"):
embedding = tf.get_variable(
"embedding", [vocab_size, size], dtype=data_type())
inputs = tf.nn.embedding_lookup(embedding, input_.input_data)
if is_training and config.keep_prob < 1:
inputs = tf.nn.dropout(inputs, config.keep_prob)
# Simplified version of models/tutorials/rnn/rnn.py's rnn().
# This builds an unrolled LSTM for tutorial purposes only.
# In general, use the rnn() or state_saving_rnn() from rnn.py.
#
# The alternative version of the code below is:
#
# inputs = tf.unstack(inputs, num=num_steps, axis=1)
# outputs, state = tf.nn.rnn(cell, inputs,
# initial_state=self._initial_state)
outputs = []
state = self._initial_state
with tf.variable_scope("RNN"):
for time_step in range(num_steps):
if time_step > 0: tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(inputs[:, time_step, :], state)
outputs.append(cell_output)
output = tf.reshape(tf.concat(1, outputs), [-1, size])
softmax_w = tf.get_variable(
"softmax_w", [size, vocab_size], dtype=data_type())
softmax_b = tf.get_variable("softmax_b", [vocab_size], dtype=data_type())
logits = tf.matmul(output, softmax_w) + softmax_b
# loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
loss = tf.nn.seq2seq.sequence_loss_by_example(
[logits],
[tf.reshape(input_.targets, [-1])],
[tf.ones([batch_size * num_steps], dtype=data_type())])
self._cost = cost = tf.reduce_sum(loss) / batch_size
self._final_state = state
if not is_training:
return
self._lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars),
config.max_grad_norm)
self._opt = tf.train.GradientDescentOptimizer(self._lr)
if FLAGS.sync_replicas:
if FLAGS.replicas_to_aggregate is None:
replicas_to_aggregate = num_workers
else:
replicas_to_aggregate = FLAGS.replicas_to_aggregate
self._opt = tf.train.SyncReplicasOptimizer(
self._opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
name="ptb_sync_replicas")
# train_step = opt.minimize(cross_entropy, global_step=global_step)
self._train_op = self._opt.apply_gradients(
zip(grads, tvars),
global_step)
# global_step=tf.contrib.framework.get_or_create_global_step())
self._new_lr = tf.placeholder(
tf.float32, shape=[], name="new_learning_rate")
self._lr_update = tf.assign(self._lr, self._new_lr)
def assign_lr(self, session, lr_value):
session.run(self._lr_update, feed_dict={self._new_lr: lr_value})
@property
def input(self):
return self._input
@property
def initial_state(self):
return self._initial_state
@property
def cost(self):
return self._cost
@property
def final_state(self):
return self._final_state
@property
def lr(self):
return self._lr
@property
def opt(self):
return self._opt
@property
def train_op(self):
return self._train_op
class SmallConfig(object):
"""Small config."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 20
hidden_size = 200
max_epoch = 4
max_max_epoch = 13
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
class MediumConfig(object):
"""Medium config."""
init_scale = 0.05
learning_rate = 1.0
max_grad_norm = 5
num_layers = 2
num_steps = 35
hidden_size = 650
max_epoch = 6
max_max_epoch = 39
keep_prob = 0.5
lr_decay = 0.8
batch_size = 20
vocab_size = 10000
class LargeConfig(object):
"""Large config."""
init_scale = 0.04
learning_rate = 1.0
max_grad_norm = 10
num_layers = 2
num_steps = 35
hidden_size = 1500
max_epoch = 14
max_max_epoch = 55
keep_prob = 0.35
lr_decay = 1 / 1.15
batch_size = 20
vocab_size = 10000
class TestConfig(object):
"""Tiny config, for testing."""
init_scale = 0.1
learning_rate = 1.0
max_grad_norm = 1
num_layers = 1
num_steps = 2
hidden_size = 2
max_epoch = 1
max_max_epoch = 1
keep_prob = 1.0
lr_decay = 0.5
batch_size = 20
vocab_size = 10000
def run_epoch(session, model, global_step, eval_op=None, verbose=False):
"""Runs the model on the given data."""
start_time = time.time()
costs = 0.0
iters = 0
state = session.run(model.initial_state)
fetches = {
"cost": model.cost,
"final_state": model.final_state,
"global_step": global_step,
}
if eval_op is not None:
fetches["eval_op"] = eval_op
for step in range(model.input.epoch_size):
feed_dict = {}
for i, (c, h) in enumerate(model.initial_state):
feed_dict[c] = state[i].c
feed_dict[h] = state[i].h
vals = session.run(fetches, feed_dict)
cost = vals["cost"]
state = vals["final_state"]
costs += cost
iters += model.input.num_steps
if verbose and step % (model.input.epoch_size // 10) == 10:
print("%.3f perplexity: %.3f speed: %.0f wps" %
(step * 1.0 / model.input.epoch_size, np.exp(costs / iters),
iters * model.input.batch_size / (time.time() - start_time)))
print("esize is %.3f, one epoch time: %.0f s" % (step,(time.time() - start_time)))
return np.exp(costs / iters)
def get_config():
if FLAGS.model == "small":
return SmallConfig()
elif FLAGS.model == "medium":
return MediumConfig()
elif FLAGS.model == "large":
return LargeConfig()
elif FLAGS.model == "test":
return TestConfig()
else:
raise ValueError("Invalid model: %s", FLAGS.model)
def main(_):
if not FLAGS.data_path:
raise ValueError("Must set --data_path to PTB data directory")
if FLAGS.job_name is None or FLAGS.job_name == "":
raise ValueError("Must specify an explicit `job_name`")
if FLAGS.task_index is None or FLAGS.task_index =="":
raise ValueError("Must specify an explicit `task_index`")
print("job name = %s" % FLAGS.job_name)
print("task index = %d" % FLAGS.task_index)
#Construct the cluster and start the server
ps_spec = FLAGS.ps_hosts.split(",")
worker_spec = FLAGS.worker_hosts.split(",")
# Get the number of workers.
num_workers = len(worker_spec)
cluster = tf.train.ClusterSpec({
"ps": ps_spec,
"worker": worker_spec})
if not FLAGS.existing_servers:
# Not using existing servers. Create an in-process server.
server = tf.train.Server(
cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
is_chief = (FLAGS.task_index == 0)
if FLAGS.num_gpus > 0:
# if FLAGS.num_gpus < num_workers:
# raise ValueError("number of gpus is less than number of workers")
# Avoid gpu allocation conflict: now allocate task_num -> #gpu
# for each worker in the corresponding machine
gpu = 0 # (FLAGS.task_index % FLAGS.num_gpus)
worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
elif FLAGS.num_gpus == 0:
# Just allocate the CPU to worker server
cpu = 0
worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
# The device setter will automatically place Variables ops on separate
# parameter servers (ps). The non-Variable ops will be placed on the workers.
# The ps use CPU and workers use corresponding GPU
raw_data = reader.ptb_raw_data(FLAGS.data_path)
train_data, valid_data, test_data, _ = raw_data
config = get_config()
eval_config = get_config()
eval_config.batch_size = 1
eval_config.num_steps = 1
# with tf.Graph().as_default():
with tf.device(
tf.train.replica_device_setter(
worker_device=worker_device,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
'''raw_data = reader.ptb_raw_data(FLAGS.data_path)
train_data, valid_data, test_data, _ = raw_data
config = get_config()
eval_config = get_config()
eval_config.batch_size = 1
eval_config.num_steps = 1'''
# with tf.Graph().as_default():
global_step = tf.Variable(0, name="global_step", trainable=False)
initializer = tf.random_uniform_initializer(-config.init_scale,
config.init_scale)
with tf.name_scope("Train"):
train_input = PTBInput(config=config, data=train_data,
ix=FLAGS.task_index, worker_num=num_workers, name="TrainInput")
with tf.variable_scope("Model", reuse=None, initializer=initializer):
m = PTBModel(is_training=True, config=config, input_=train_input, num_workers=num_workers,
global_step = global_step)
tf.scalar_summary("Training Loss", m.cost)
tf.scalar_summary("Learning Rate", m.lr)
if FLAGS.sync_replicas:
local_init_op = m.opt.local_step_init_op
if is_chief:
local_init_op = m.opt.chief_init_op
ready_for_local_init_op = m.opt.ready_for_local_init_op
# Initial token and chief queue runners required by the sync_replicas mode
chief_queue_runner = m.opt.get_chief_queue_runner()
sync_init_op = m.opt.get_init_tokens_op()
# init_op = tf.global_variables_initializer()
init_op = tf.initialize_all_variables()
train_dir = tempfile.mkdtemp()
with tf.name_scope("Valid"):
valid_input = PTBInput(config=config, data=valid_data,
ix=FLAGS.task_index, worker_num=num_workers, name="ValidInput")
with tf.variable_scope("Model", reuse=True, initializer=initializer):
mvalid = PTBModel(is_training=False, config=config, input_=valid_input, num_workers=num_workers,
global_step=global_step)
tf.scalar_summary("Validation Loss", mvalid.cost)
with tf.name_scope("Test"):
test_input = PTBInput(config=eval_config, data=test_data,
ix=0, worker_num=1, name="TestInput")
with tf.variable_scope("Model", reuse=True, initializer=initializer):
mtest = PTBModel(is_training=False, config=eval_config,
input_=test_input, num_workers=num_workers,
global_step=global_step)
if FLAGS.sync_replicas:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
else:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])
# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
print("Worker %d: Initializing session..." % FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." %
FLAGS.task_index)
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
session = sv.prepare_or_wait_for_session(server_grpc_url,
config=sess_config)
else:
session = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
session.run(sync_init_op)
sv.start_queue_runners(session, [chief_queue_runner])
# sv = tf.train.Supervisor(logdir=FLAGS.save_path)
# with sv.managed_session() as session:
for i in range(config.max_max_epoch):
lr_decay = config.lr_decay ** max(i + 1 - config.max_epoch, 0.0)
m.assign_lr(session, config.learning_rate * lr_decay)
print("Epoch: %d Learning rate: %.3f" % (i + 1, session.run(m.lr)))
train_perplexity = run_epoch(session, m, global_step, eval_op=m.train_op,
verbose=True)
print("Epoch: %d Train Perplexity: %.3f" % (i + 1, train_perplexity))
valid_perplexity = run_epoch(session, mvalid, global_step)
print("Epoch: %d Valid Perplexity: %.3f" % (i + 1, valid_perplexity))
test_perplexity = run_epoch(session, mtest, global_step)
print("Test Perplexity: %.3f" % test_perplexity)
if FLAGS.save_path:
print("Saving model to %s." % FLAGS.save_path)
sv.saver.save(session, FLAGS.save_path, global_step=sv.global_step)
if __name__ == "__main__":
tf.app.run()
The timeline of original single-gpu verion ( with 1 ps server, 2 worker ) is as follow ( one iteration ):
The timeline of worker 0 of distributed verion is as follow ( one iteration ), worker 1 is has similar timeline:
The machine has two Tesla m40 gpus, the single-gpu verion performance is about 11000 wps ( with GPU util is about 60% ), while the between graph version is only 6000 wps for each worker ( GPU util is about more than 30% for each gpu ), so the speedup is only 1.09 for two workers (two gpus). At the same time, i also made a multi-gpu verions of PTB model (without distibuted between-graph or in-graph frameworks ) which has the 1.6+ speedup in the same machine. so what caused the poor perfromance for distributed version ?
The run command: PS: CUDA_VISIBLE_DEVICES="" python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=ps --task_index=0
Worker 0: CUDA_VISIBLE_DEVICES=0 python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=0
Worker 1: CUDA_VISIBLE_DEVICES=1 python ptb_word_lm_dist.py --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=1
( Tested with tensorflow 0.12. A modified version for tf 1.0.0 has worse performance )
Upvotes: 1
Views: 339
Reputation: 1
Maybe the bottleneck is disk IO, because of the single-gpu verion performance is about 11000 wps ( with GPU util is about 60% )
Upvotes: 0
Reputation: 57973
What if you do this using single process tensorflow? IE, single process with CUDA_VISIBLE_DEVICES=0,1
? It would make sense to try that first, to rule out issues that are unrelated to distributed TensorFlow.
I've seen a situation with distributed TensorFlow running model from https://github.com/rafaljozefowicz/lm where 8 GPU on one worker was 8 times faster than 8 TensorFlow workers.
Some digging found that this issue was a large contributor https://github.com/tensorflow/tensorflow/issues/6116
What happens now is that sending large Tensors through gRPC is extremely inefficient. The fix is in master but not yet in TF 1.0, so you have to get a latest nightly to try it out.
Upvotes: 1