Reputation: 31
I am working on computing hessian Matrix on Large data sets. I am trying to perform these computations in parallel on Multiple CPUs. My Set Up currently has 1 node with 10 CPU's. I am working on Python 2.7
I wrote a small abstraction of my code to understand distributed tensorflow better. below is the error
2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
Process Process-3:
Traceback (most recent call last):
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
self._server_def.SerializeToString(), status)
File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server
I recieve this error each time I run the code. However it proceeds further to produce an ouput for one of the two process I have set up as below
> `2017-07-23 16:27:48.605617: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session fe9fd6a338e2c9a7 with config:
2017-07-23 16:27:48.607126: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 3560417f98b00dea with config:
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3
Upon this It continues to wait for the next
ERROR:tensorflow:==================================
Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
<tf.Operation 'worker_0/init' type=NoOp>
If you want to mark it as used call its "mark_used()" method.
It was originally created here:
['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>\n proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in start\n self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__\n code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap\n self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run\n self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10\n init_op=tf.initialize_all_variables(),logdir=\'/tmp/mydir\')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrapped\n return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warning\n wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__\n stack = [s.strip() for s in traceback.format_stack()]']
==================================
2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:
I have 2 questions here
[{'worker': 0}, {'worker': 0}]
Can I use multiprocessing queue to share dictionary between two sessions running on two different processes on tensorflow ?
Below is my code
# build a python mutliprocess.py
import multiprocessing
import time
import tensorflow as tf
from tensorflow.contrib.training import HParams
import os
import psutil
import numpy as np
from tensorflow.python.client import device_lib
from resnet import *
import Queue
cluster_spec ={"ps": ["localhost:2226"
],
"worker": [
"localhost:2227",
"localhost:2228"]}
cluster = tf.train.ClusterSpec(cluster_spec)
im_Test = np.linspace(1,10,10)
def model_fun(input):
print multiprocessing.current_process().name
return input
def cifar10(device,return_dict,result_t):
params = HParams(cluster=cluster,
job_name = device[0],
task_index = device[1])
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
input_img=[]
true_lab=[]
if params.job_name == "ps":
##try and wait for all the wokers t
serv.join()
elif params.job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
cluster=cluster)):
# with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
# with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
hess_op = model_fun(input_img)
global_step = tf.contrib.framework.get_or_create_global_step()
sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
global_step=global_step,
init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
with sv.prepare_or_wait_for_session(serv.target) as sess:
step = 0
while not sv.should_stop() :
hess = sess.run(hess_op, feed_dict={input_img:im_Test })
print(np.array(hess))
print multiprocessing.current_process().name
step += 1
if(step==3):
return_dict[params.job_name] = params.task_index
result_t.put(return_dict)
break
sv.stop()
sess.close()
return
if __name__ == '__main__':
logger = multiprocessing.log_to_stderr()
manager = multiprocessing.Manager()
result = manager.Queue()
return_dict = manager.dict()
processes = []
devices = [['ps', 0],
['worker', 0],
['worker', 1]
]
for i in (devices):
start_time = time.time()
proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
processes.append(proc)
proc.start()
for p in processes:
p.join()
# print return_dict.values()
kill = []
while True:
if result.empty() == True:
break
kill.append(result.get())
print kill
print("time taken = %d" % (start_time - time.time()))
Upvotes: 2
Views: 5734
Reputation: 737
In my case , I find ps raise this error and woker wait for response when I submit a tensorflowonspark job yarn cluster mode.
ps
error as follow
2018-01-17 11:08:46,366 INFO (MainThread-7305) Starting TensorFlow ps:0 on cluster node 0 on background process 2018-01-17 11:08:56,085 INFO (MainThread-7395) 0: ======== ps:0 ======== 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Cluster spec: {'ps': ['172.16.5.30:33088'], 'worker': ['172.16.5.22:41428', '172.16.5.30:33595']} 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Using CPU 2018-01-17 11:08:56.087452: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA E0117 11:08:56.088501182 7395 ev_epoll1_linux.c:1051] grpc epoll fd: 10 E0117 11:08:56.088860707 7395 server_chttp2.c:38] {"created":"@1516158536.088783549","description":"No address added out of total 1 resolved","file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1516158536.088779164","description":"Failed to add any wildcard listeners","file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.c","file_line":338,"referenced_errors":[{"created":"@1516158536.088771177","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088767669","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]},{"created":"@1516158536.088778651","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088776541","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]}]}]} Process Process-2: Traceback (most recent call last): File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000001/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 269, in wrapper_fn File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/pyfiles/mnist_dist.py", line 38, in map_fun cluster, server = ctx.start_cluster_server(1, args.rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 56, in start_cluster_server return TFNode.start_cluster_server(self, num_gpus, rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFNode.py", line 110, in start_cluster_server server = tf.train.Server(cluster, ctx.job_name, ctx.task_index) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in init self._server_def.SerializeToString(), status) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 473, in exit c_api.TF_GetCode(self.status.status)) UnknownError: Could not start gRPC server
woker:1 log
2018-01-17 11:09:14.614244: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0
Then i check the port in ps server. Yes,the port was in used.
So resubmit the job solve the problem.
But if yor recieve this error each time you run the code,you should check more logs to find the cause.
Upvotes: 1