Between-graph replication version of PTB rnn model is slower than single-gpu version ( even in tf 1.0.0 )

I changed the PTB model (you can found it in tensorflow/models/tutorials/rnn/ptb) to a between-graph version, but this distributed version ( with 1 ps server, 2 worker ) has no speedup effect even if the ps and workers are in single machine. the timeline profiling shows significant delay between GPU jobs and CPU jobs for distributed version. below are codes and timeline graphs:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

import numpy as np
import tensorflow as tf

import reader
import tempfile

flags = tf.flags
logging = tf.logging

    "model", "small",
    "A type of model. Possible options are: small, medium, large.")
flags.DEFINE_string("data_path", None,
                    "Where the training/test data is stored.")
flags.DEFINE_string("save_path", None,
                    "Model output directory.")
flags.DEFINE_bool("use_fp16", False,
                  "Train using 16-bit floats instead of 32bit floats")

                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("worker_hosts", "IP1:2223,IP1:2224",
                    "Comma-separated list of hostname:port pairs")
flags.DEFINE_string("job_name", None,"job name: worker or ps")
flags.DEFINE_integer("task_index", None,
                     "Worker task index, should be >= 0. task_index=0 is "
                     "the master worker task the performs the variable "
                     "initialization ")
flags.DEFINE_integer("num_gpus", 1,
                     "Total number of gpus for each machine."
                     "If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
                     "Number of replicas to aggregate before parameter update"
                     "is applied (For sync_replicas mode only; default: "
flags.DEFINE_boolean("sync_replicas", False,
                     "Use the sync_replicas (synchronized replicas) mode, "
                     "wherein the parameter updates from workers are aggregated "
                     "before applied to avoid stale gradients")
    "existing_servers", False, "Whether servers already exists. If True, "
    "will use the worker hosts via their GRPC URLs (one client process "
    "per worker host). Otherwise, will create an in-process TensorFlow "


def data_type():
  return tf.float16 if FLAGS.use_fp16 else tf.float32

class PTBInput(object):
  """The input data."""

  def __init__(self, config, data, ix, worker_num, name=None):
    data_len = len(data) // worker_num
    data = data[data_len * ix:data_len * (ix + 1)]

    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)

class PTBModel(object):
  """The PTB model."""

  def __init__(self, is_training, config, input_, num_workers=0, global_step=None):
    self._input = input_

    batch_size = input_.batch_size
    num_steps = input_.num_steps
    size = config.hidden_size
    vocab_size = config.vocab_size

    # Slightly better results can be obtained with forget gate biases
    # initialized to 1 but the hyperparameters of the model would need to be
    # different than reported in the paper.
    def lstm_cell():
      # return tf.contrib.rnn.BasicLSTMCell(
      return tf.nn.rnn_cell.BasicLSTMCell(
          size, forget_bias=0.0, state_is_tuple=True)
    attn_cell = lstm_cell
    if is_training and config.keep_prob < 1:
      def attn_cell():
        return tf.contrib.rnn.DropoutWrapper(
            lstm_cell(), output_keep_prob=config.keep_prob)
    # cell = tf.contrib.rnn.MultiRNNCell(
    cell = tf.nn.rnn_cell.MultiRNNCell(
        [attn_cell() for _ in range(config.num_layers)], state_is_tuple=True)

    self._initial_state = cell.zero_state(batch_size, data_type())

    with tf.device("/cpu:0"):
      embedding = tf.get_variable(
          "embedding", [vocab_size, size], dtype=data_type())
      inputs = tf.nn.embedding_lookup(embedding, input_.input_data)

    if is_training and config.keep_prob < 1:
      inputs = tf.nn.dropout(inputs, config.keep_prob)

    # Simplified version of models/tutorials/rnn/'s rnn().
    # This builds an unrolled LSTM for tutorial purposes only.
    # In general, use the rnn() or state_saving_rnn() from
    # The alternative version of the code below is:
    # inputs = tf.unstack(inputs, num=num_steps, axis=1)
    # outputs, state = tf.nn.rnn(cell, inputs,
    #                            initial_state=self._initial_state)
    outputs = []
    state = self._initial_state
    with tf.variable_scope("RNN"):
      for time_step in range(num_steps):
        if time_step > 0: tf.get_variable_scope().reuse_variables()
        (cell_output, state) = cell(inputs[:, time_step, :], state)

    output = tf.reshape(tf.concat(1, outputs), [-1, size])
    softmax_w = tf.get_variable(
        "softmax_w", [size, vocab_size], dtype=data_type())
    softmax_b = tf.get_variable("softmax_b", [vocab_size], dtype=data_type())
    logits = tf.matmul(output, softmax_w) + softmax_b
    # loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
    loss = tf.nn.seq2seq.sequence_loss_by_example(
        [tf.reshape(input_.targets, [-1])],
        [tf.ones([batch_size * num_steps], dtype=data_type())])
    self._cost = cost = tf.reduce_sum(loss) / batch_size
    self._final_state = state

    if not is_training:

    self._lr = tf.Variable(0.0, trainable=False)
    tvars = tf.trainable_variables()
    grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars),
    self._opt = tf.train.GradientDescentOptimizer(self._lr)

    if FLAGS.sync_replicas:
      if FLAGS.replicas_to_aggregate is None:
        replicas_to_aggregate = num_workers
        replicas_to_aggregate = FLAGS.replicas_to_aggregate

      self._opt = tf.train.SyncReplicasOptimizer(

    # train_step = opt.minimize(cross_entropy, global_step=global_step)

    self._train_op = self._opt.apply_gradients(
        zip(grads, tvars),
        # global_step=tf.contrib.framework.get_or_create_global_step())

    self._new_lr = tf.placeholder(
        tf.float32, shape=[], name="new_learning_rate")
    self._lr_update = tf.assign(self._lr, self._new_lr)

  def assign_lr(self, session, lr_value):, feed_dict={self._new_lr: lr_value})

  def input(self):
    return self._input

  def initial_state(self):
    return self._initial_state

  def cost(self):
    return self._cost

  def final_state(self):
    return self._final_state

  def lr(self):
    return self._lr

  def opt(self):
    return self._opt

  def train_op(self):
    return self._train_op

class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

class MediumConfig(object):
  """Medium config."""
  init_scale = 0.05
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 35
  hidden_size = 650
  max_epoch = 6
  max_max_epoch = 39
  keep_prob = 0.5
  lr_decay = 0.8
  batch_size = 20
  vocab_size = 10000

class LargeConfig(object):
  """Large config."""
  init_scale = 0.04
  learning_rate = 1.0
  max_grad_norm = 10
  num_layers = 2
  num_steps = 35
  hidden_size = 1500
  max_epoch = 14
  max_max_epoch = 55
  keep_prob = 0.35
  lr_decay = 1 / 1.15
  batch_size = 20
  vocab_size = 10000

class TestConfig(object):
  """Tiny config, for testing."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 1
  num_layers = 1
  num_steps = 2
  hidden_size = 2
  max_epoch = 1
  max_max_epoch = 1
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

def run_epoch(session, model, global_step, eval_op=None, verbose=False):
  """Runs the model on the given data."""
  start_time = time.time()
  costs = 0.0
  iters = 0
  state =

  fetches = {
      "cost": model.cost,
      "final_state": model.final_state,
      "global_step": global_step,
  if eval_op is not None:
    fetches["eval_op"] = eval_op

  for step in range(model.input.epoch_size):
    feed_dict = {}
    for i, (c, h) in enumerate(model.initial_state):
      feed_dict[c] = state[i].c
      feed_dict[h] = state[i].h

    vals =, feed_dict)
    cost = vals["cost"]
    state = vals["final_state"]

    costs += cost
    iters += model.input.num_steps

    if verbose and step % (model.input.epoch_size // 10) == 10:
      print("%.3f perplexity: %.3f speed: %.0f wps" %
            (step * 1.0 / model.input.epoch_size, np.exp(costs / iters),
             iters * model.input.batch_size / (time.time() - start_time)))
  print("esize is %.3f, one epoch time: %.0f s" % (step,(time.time() - start_time))) 
  return np.exp(costs / iters)

def get_config():
  if FLAGS.model == "small":
    return SmallConfig()
  elif FLAGS.model == "medium":
    return MediumConfig()
  elif FLAGS.model == "large":
    return LargeConfig()
  elif FLAGS.model == "test":
    return TestConfig()
    raise ValueError("Invalid model: %s", FLAGS.model)

def main(_):
  if not FLAGS.data_path:
    raise ValueError("Must set --data_path to PTB data directory")

  if FLAGS.job_name is None or FLAGS.job_name == "":
    raise ValueError("Must specify an explicit `job_name`")
  if FLAGS.task_index is None or FLAGS.task_index =="":
    raise ValueError("Must specify an explicit `task_index`")

  print("job name = %s" % FLAGS.job_name)
  print("task index = %d" % FLAGS.task_index)

  #Construct the cluster and start the server
  ps_spec = FLAGS.ps_hosts.split(",")
  worker_spec = FLAGS.worker_hosts.split(",")

  # Get the number of workers.
  num_workers = len(worker_spec)

  cluster = tf.train.ClusterSpec({
      "ps": ps_spec,
      "worker": worker_spec})

  if not FLAGS.existing_servers:
    # Not using existing servers. Create an in-process server.
    server = tf.train.Server(
        cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
    if FLAGS.job_name == "ps":

  is_chief = (FLAGS.task_index == 0)
  if FLAGS.num_gpus > 0:
    # if FLAGS.num_gpus < num_workers:
    #  raise ValueError("number of gpus is less than number of workers")
    # Avoid gpu allocation conflict: now allocate task_num -> #gpu
    # for each worker in the corresponding machine
    gpu = 0 # (FLAGS.task_index % FLAGS.num_gpus)
    worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu)
  elif FLAGS.num_gpus == 0:
    # Just allocate the CPU to worker server
    cpu = 0
    worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu)
  # The device setter will automatically place Variables ops on separate
  # parameter servers (ps). The non-Variable ops will be placed on the workers.
  # The ps use CPU and workers use corresponding GPU
  raw_data = reader.ptb_raw_data(FLAGS.data_path)
  train_data, valid_data, test_data, _ = raw_data

  config = get_config()
  eval_config = get_config()
  eval_config.batch_size = 1
  eval_config.num_steps = 1

  # with tf.Graph().as_default():
  with tf.device(
    '''raw_data = reader.ptb_raw_data(FLAGS.data_path)
    train_data, valid_data, test_data, _ = raw_data

    config = get_config()
    eval_config = get_config()
    eval_config.batch_size = 1
    eval_config.num_steps = 1'''

    # with tf.Graph().as_default():
    global_step = tf.Variable(0, name="global_step", trainable=False)
    initializer = tf.random_uniform_initializer(-config.init_scale,

    with tf.name_scope("Train"):
      train_input = PTBInput(config=config, data=train_data,
                             ix=FLAGS.task_index, worker_num=num_workers, name="TrainInput")
      with tf.variable_scope("Model", reuse=None, initializer=initializer):
        m = PTBModel(is_training=True, config=config, input_=train_input, num_workers=num_workers,
                     global_step = global_step)
      tf.scalar_summary("Training Loss", m.cost)
      tf.scalar_summary("Learning Rate",

    if FLAGS.sync_replicas:
      local_init_op = m.opt.local_step_init_op
      if is_chief:
        local_init_op = m.opt.chief_init_op

      ready_for_local_init_op = m.opt.ready_for_local_init_op

      # Initial token and chief queue runners required by the sync_replicas mode
      chief_queue_runner = m.opt.get_chief_queue_runner()
      sync_init_op = m.opt.get_init_tokens_op()

    # init_op = tf.global_variables_initializer()
    init_op = tf.initialize_all_variables()
    train_dir = tempfile.mkdtemp()

    with tf.name_scope("Valid"):
      valid_input = PTBInput(config=config, data=valid_data,
                             ix=FLAGS.task_index, worker_num=num_workers, name="ValidInput")
      with tf.variable_scope("Model", reuse=True, initializer=initializer):
        mvalid = PTBModel(is_training=False, config=config, input_=valid_input, num_workers=num_workers,
      tf.scalar_summary("Validation Loss", mvalid.cost)

    with tf.name_scope("Test"):
      test_input = PTBInput(config=eval_config, data=test_data,
                            ix=0, worker_num=1, name="TestInput")
      with tf.variable_scope("Model", reuse=True, initializer=initializer):
        mtest = PTBModel(is_training=False, config=eval_config,
                         input_=test_input, num_workers=num_workers,

    if FLAGS.sync_replicas:
      sv = tf.train.Supervisor(
      sv = tf.train.Supervisor(

    sess_config = tf.ConfigProto(
        device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])

    # The chief worker (task_index==0) session will prepare the session,
    # while the remaining workers will wait for the preparation to complete.
    if is_chief:
      print("Worker %d: Initializing session..." % FLAGS.task_index)
      print("Worker %d: Waiting for session to be initialized..." %

    if FLAGS.existing_servers:
      server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
      print("Using existing server at: %s" % server_grpc_url)

      session = sv.prepare_or_wait_for_session(server_grpc_url,
      session = sv.prepare_or_wait_for_session(, config=sess_config)

    print("Worker %d: Session initialization complete." % FLAGS.task_index)

    if FLAGS.sync_replicas and is_chief:
      # Chief worker will start the chief queue runner and call the init op.
      sv.start_queue_runners(session, [chief_queue_runner])

    # sv = tf.train.Supervisor(logdir=FLAGS.save_path)
    # with sv.managed_session() as session:
    for i in range(config.max_max_epoch):
      lr_decay = config.lr_decay ** max(i + 1 - config.max_epoch, 0.0)
      m.assign_lr(session, config.learning_rate * lr_decay)

      print("Epoch: %d Learning rate: %.3f" % (i + 1,
      train_perplexity = run_epoch(session, m, global_step, eval_op=m.train_op,
      print("Epoch: %d Train Perplexity: %.3f" % (i + 1, train_perplexity))
      valid_perplexity = run_epoch(session, mvalid, global_step)
      print("Epoch: %d Valid Perplexity: %.3f" % (i + 1, valid_perplexity))

    test_perplexity = run_epoch(session, mtest, global_step)
    print("Test Perplexity: %.3f" % test_perplexity)

    if FLAGS.save_path:
      print("Saving model to %s." % FLAGS.save_path), FLAGS.save_path, global_step=sv.global_step)

if __name__ == "__main__":

The timeline of original single-gpu verion ( with 1 ps server, 2 worker ) is as follow ( one iteration ):

enter image description here

The timeline of worker 0 of distributed verion is as follow ( one iteration ), worker 1 is has similar timeline:

enter image description here

The machine has two Tesla m40 gpus, the single-gpu verion performance is about 11000 wps ( with GPU util is about 60% ), while the between graph version is only 6000 wps for each worker ( GPU util is about more than 30% for each gpu ), so the speedup is only 1.09 for two workers (two gpus). At the same time, i also made a multi-gpu verions of PTB model (without distibuted between-graph or in-graph frameworks ) which has the 1.6+ speedup in the same machine. so what caused the poor perfromance for distributed version ?

The run command: PS: CUDA_VISIBLE_DEVICES="" python --data_path=/data/simple-examples/data/ --model small --job_name=ps --task_index=0

Worker 0: CUDA_VISIBLE_DEVICES=0 python --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=0

Worker 1: CUDA_VISIBLE_DEVICES=1 python --data_path=/data/simple-examples/data/ --model small --job_name=worker --task_index=1

( Tested with tensorflow 0.12. A modified version for tf 1.0.0 has worse performance )

Maybe the bottleneck is disk IO, because of the single-gpu verion performance is about 11000 wps ( with GPU util is about 60% )

Yaroslav Bulatov

What if you do this using single process tensorflow? IE, single process with CUDA_VISIBLE_DEVICES=0,1? It would make sense to try that first, to rule out issues that are unrelated to distributed TensorFlow.

I've seen a situation with distributed TensorFlow running model from where 8 GPU on one worker was 8 times faster than 8 TensorFlow workers.

Some digging found that this issue was a large contributor

What happens now is that sending large Tensors through gRPC is extremely inefficient. The fix is in master but not yet in TF 1.0, so you have to get a latest nightly to try it out.

