Reputation: 119
We are planning to implement distributed training in Tensorflow. For that, we are using Tensorflow Distributed (https://www.tensorflow.org/deploy/distributed). We are able to achieve distributed training using "asynchronous between graph replication training". Below is the code snippet.
.....
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
img_width, img_height = 124, 124
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
####### Assigns ops to the local worker by default.#######
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
######## set Keras learning phase to train #######
K.set_learning_phase(1)
# do not initialize variables on the fly
K.manual_variable_initialization(True)
if K.image_data_format() == 'channels_first':
input_shape = (3, img_width, img_height)
else:
input_shape = (img_width, img_height, 3)
X= tf.placeholder(tf.float32, shape= [None, img_width, img_height, 3], name= "X")
Y= tf.placeholder(tf.float32, shape= [None, n_classes], name="Y")
print("Building keras model")
....
....
####### Defining our total loss #######
###### Defining our TF Optimizer and passing hyperparameters ######
.......
...........
...............
We have defined our training supervisor like as per below.
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir=logdir,
init_op=init_op,
saver=saver,
summary_op=summary_op,
global_step=global_step)
We have also initialized the supervisor using below code snippet.
with sv.prepare_or_wait_for_session(server.target) as sess:
Then we have passed on different batches during the training. Till this part everything works fine. But when we are trying save/export the model for tensorflow serving, it's not generating right set of check point files so that we can serve it in production. While hosting the checkpoint files through tensorflow_model_server, we are getting below error.
Loading servable: {name: default version: 2} failed: Invalid argument:
Cannot assign a device for operation 'init_all_tables': Operation was
explicitly assigned to /job:worker/task:0 but available devices are [
/job:localhost/replica:0/task:0/cpu:0 ]. Make sure the device specification
refers to a valid device.[[Node: init_all_tables =
NoOp[_device="/job:worker/task:0"]()]]
Just to note, we have also tried following methods to save the trained graph.
i) SavedModelBuilder
builder = saved_model_builder.SavedModelBuilder(export_path)
ii) Model exporter
export_path = "/saved_graph/"
model_exporter.export(export_path, sess)
iii) tf.train.Saver - function
tf.train.Saver
We couldn't find any direct article which shows a complete example or explain things in details.We have gone through below reference links.
https://github.com/tensorflow/tensorflow/issues/5439 https://github.com/tensorflow/tensorflow/issues/5110 Running distributed Tensorflow with InvalidArgumentError: You must feed a value for placeholder tensor 'Placeholder' with dtype float
Any suggestions or reference would be of great help.
Thank you.
As per the suggestions, We have tried to use "clear_devices=True" while exporting the model but that didn't help. Below is the code snippet which we have used.
for epoch in range(training_epochs):
epoch_num=0
batch_count = int(num_img/batch_size)
count = 0
for i in range(batch_count):
epoch_num=0
# This will create batches out of out Training dataset and it will
pe passed to the feed_dict
batch_x, batch_y =
next_batch(batch_size,epoch_num,train_data,train_labels,num_img)
# perform the operations we defined earlier on batch
_, cost, step = sess.run([train_op, cross_entropy, global_step],
feed_dict={X: batch_x, Y: batch_y})
sess.run(tf.global_variables_initializer())
builder = tf.saved_model.builder.SavedModelBuilder(path)
builder.add_meta_graph_and_variables(
sess,
[tf.saved_model.tag_constants.SERVING],
signature_def_map= {
"magic_model":
tf.saved_model.signature_def_utils.predict_signature_def(
inputs= {"image": X},
outputs= {"prediction": preds})
}, clear_devices=True)
builder.save()
sv.stop()
print("Done!!")
We are getting below error when we are using clear_devices=True..
Error: Traceback (most recent call last):
File "insulator_classifier.py", line 370, in <module>
tf.app.run()
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/platform/app.py", line 48, in run
_sys.exit(main(_sys.argv[:1] + flags_passthrough))
File "insulator_classifier.py", line 283, in main
}, clear_devices=False)
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/saved_model/builder_impl.py", line 364, in
add_meta_graph_and_variables
allow_empty=True)
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/training/saver.py", line 1140, in __init__
self.build()
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/training/saver.py", line 1172, in build
filename=self._filename)
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/training/saver.py", line 677, in build
filename_tensor = constant_op.constant(filename or "model")
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/framework/constant_op.py", line 106, in constant
attrs={"value": tensor_value, "dtype": dtype_value}, name=name).outputs[0]
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/framework/ops.py", line 2582, in create_op
self._check_not_finalized()
File "/root/anaconda3/lib/python3.6/site-
packages/tensorflow/python/framework/ops.py", line 2290, in
_check_not_finalized
raise RuntimeError("Graph is finalized and cannot be modified.")
RuntimeError: Graph is finalized and cannot be modified.
Anything we are missing in here ??
We can see that it works with suggestion 2) from @Tianjin Gu Please see below code snippet
X= tf.placeholder(tf.float32, shape= [None, img_width, img_height, 3], name= "X")
Y= tf.placeholder(tf.float32, shape= [None, n_classes], name="Y")
....
....
model_exporter = exporter.Exporter(saver)
model_exporter.init(
tf.get_default_graph().as_graph_def(),
named_graph_signatures={
'inputs': exporter.generic_signature({'input': X}),
'outputs': exporter.generic_signature({'output': Y})}, clear_devices=True)
export_path = "/export_path"
As we exporter we see this warning -
WARNING:tensorflow:From test_classifier.py:283: Exporter.export (from tensorflow.contrib.session_bundle.exporter) is deprecated and will be removed after 2017-06-30.
So we should ideally use "tf.saved_model.builder.SavedModelBuilder" - but that's not working for some reason.
Any further suggestion ?
Thank you.
Upvotes: 1
Views: 1588
Reputation: 784
Notice clear_devices=True
;
If use SavedModelBuilder
, set clear_devices=True
when calling add_meta_graph()
or add_meta_graph_and_variables()
If use exporter
, set clear_devices=True
when constructing exporter.Exporter
For use SavedModelBuilder
issue, it does not need to create a SavedModelBuilder
every epoch, so your should move the lines before for loop and it does not need save model every epoch either, you can move builder.save()
after for loop. So the code will be like:
builder = tf.saved_model.builder.SavedModelBuilder(path)
builder.add_meta_graph_and_variables(sess,
[tf.saved_model.tag_constants.SERVING],
signature_def_map = {"magic_model": tf.saved_model.signature_def_utils.predict_signature_def(inputs= {"image": X}, outputs= {"prediction": preds})},
clear_devices=True)
for epoch in range(training_epochs):
epoch_num=0
batch_count = int(num_img/batch_size)
count = 0
for i in range(batch_count):
epoch_num=0
# This will create batches out of out Training dataset and it will pe passed to the feed_dict
batch_x, batch_y =
next_batch(batch_size,epoch_num,train_data,train_labels,num_img)
# perform the operations we defined earlier on batch
_, cost, step = sess.run([train_op, cross_entropy, global_step],
feed_dict={X: batch_x, Y: batch_y})
sess.run(tf.global_variables_initializer())
builder.save()
sv.stop()
print("Done!!")
For use export.Exporter
the warning does not matter too much, you can still load the file by TensorFlowServing
Upvotes: 2