shreya kadambi
shreya kadambi

Reputation: 31

Tensorflow Multoprocessing; UnknownError: Could not start gRPC server

I am working on computing hessian Matrix on Large data sets. I am trying to perform these computations in parallel on Multiple CPUs. My Set Up currently has 1 node with 10 CPU's. I am working on Python 2.7

I wrote a small abstraction of my code to understand distributed tensorflow better. below is the error

2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
Process Process-3:
Traceback (most recent call last):
  File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
    serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
  File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
    self._server_def.SerializeToString(), status)
  File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
    self.gen.next()
  File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server

I recieve this error each time I run the code. However it proceeds further to produce an ouput for one of the two process I have set up as below

> `2017-07-23 16:27:48.605617: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session fe9fd6a338e2c9a7 with config: 

2017-07-23 16:27:48.607126: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 3560417f98b00dea with config: 

[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
Process-3
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
Process-3
[  1.   2.   3.   4.   5.   6.   7.   8.   9.  10.]
Process-3

Upon this It continues to wait for the next

ERROR:tensorflow:==================================
Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
<tf.Operation 'worker_0/init' type=NoOp>
If you want to mark it as used call its "mark_used()" method.
It was originally created here:
['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>\n    proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in start\n    self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__\n    code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap\n    self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run\n    self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10\n    init_op=tf.initialize_all_variables(),logdir=\'/tmp/mydir\')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrapped\n    return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warning\n    wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__\n    stack = [s.strip() for s in traceback.format_stack()]']
==================================
2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica: 

I have 2 questions here

  1. How do I fix this error regarding Grpc
  2. I have set up a multiprocessing queue 'result'using Manager() and pass it to both the workers while setting up the process. I expect that as soon as the condition is reached each process will write their Job ID to the queue however it appears that the queue always contains the last finished process. Could this mean that somewhere the queue is being overwritten by another process

[{'worker': 0}, {'worker': 0}]

Can I use multiprocessing queue to share dictionary between two sessions running on two different processes on tensorflow ?

Below is my code

# build a python mutliprocess.py
import multiprocessing
import time
import tensorflow as tf
from tensorflow.contrib.training import HParams
import os
import psutil
import numpy as np
from tensorflow.python.client import device_lib
from resnet import *
import Queue

cluster_spec ={"ps": ["localhost:2226"
                      ],
    "worker": [
        "localhost:2227",
        "localhost:2228"]}

cluster = tf.train.ClusterSpec(cluster_spec)
im_Test = np.linspace(1,10,10)

def model_fun(input):
    print multiprocessing.current_process().name
    return input

def cifar10(device,return_dict,result_t):
    params = HParams(cluster=cluster,
                     job_name = device[0],
                     task_index = device[1])

    serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
    input_img=[]
    true_lab=[]

    if params.job_name == "ps":
        ##try and wait for all the wokers t
        serv.join()
    elif params.job_name == "worker":
        with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
                                                      cluster=cluster)):
            # with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
            # with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
            input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
            with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
                hess_op = model_fun(input_img)
                global_step = tf.contrib.framework.get_or_create_global_step()
                sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
                                         global_step=global_step,
                                         init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
                with sv.prepare_or_wait_for_session(serv.target) as sess:
                    step = 0
                    while not sv.should_stop() :
                        hess = sess.run(hess_op, feed_dict={input_img:im_Test })
                        print(np.array(hess))
                        print multiprocessing.current_process().name
                        step += 1
                        if(step==3):
                            return_dict[params.job_name] = params.task_index
                            result_t.put(return_dict)
                            break
                    sv.stop()
                    sess.close()


    return

if __name__ == '__main__':

    logger = multiprocessing.log_to_stderr()
    manager = multiprocessing.Manager()
    result = manager.Queue()
    return_dict = manager.dict()
    processes = []
    devices = [['ps', 0],
               ['worker', 0],
               ['worker', 1]
               ]

    for i in (devices):
        start_time = time.time()
        proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
        processes.append(proc)
        proc.start()

    for p in processes:
        p.join()

    # print return_dict.values()
    kill = []
    while True:
        if result.empty() == True:
                break
        kill.append(result.get())
        print kill


    print("time taken = %d" % (start_time - time.time()))

Upvotes: 2

Views: 5734

Answers (1)

Matiji66
Matiji66

Reputation: 737

In my case , I find ps raise this error and woker wait for response when I submit a tensorflowonspark job yarn cluster mode.

ps error as follow

2018-01-17 11:08:46,366 INFO (MainThread-7305) Starting TensorFlow ps:0 on cluster node 0 on background process 2018-01-17 11:08:56,085 INFO (MainThread-7395) 0: ======== ps:0 ======== 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Cluster spec: {'ps': ['172.16.5.30:33088'], 'worker': ['172.16.5.22:41428', '172.16.5.30:33595']} 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Using CPU 2018-01-17 11:08:56.087452: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA E0117 11:08:56.088501182 7395 ev_epoll1_linux.c:1051] grpc epoll fd: 10 E0117 11:08:56.088860707 7395 server_chttp2.c:38] {"created":"@1516158536.088783549","description":"No address added out of total 1 resolved","file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1516158536.088779164","description":"Failed to add any wildcard listeners","file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.c","file_line":338,"referenced_errors":[{"created":"@1516158536.088771177","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088767669","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]},{"created":"@1516158536.088778651","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088776541","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]}]}]} Process Process-2: Traceback (most recent call last): File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000001/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 269, in wrapper_fn File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/pyfiles/mnist_dist.py", line 38, in map_fun cluster, server = ctx.start_cluster_server(1, args.rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 56, in start_cluster_server return TFNode.start_cluster_server(self, num_gpus, rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFNode.py", line 110, in start_cluster_server server = tf.train.Server(cluster, ctx.job_name, ctx.task_index) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in init self._server_def.SerializeToString(), status) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 473, in exit c_api.TF_GetCode(self.status.status)) UnknownError: Could not start gRPC server

woker:1 log

2018-01-17 11:09:14.614244: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0

Then i check the port in ps server. Yes,the port was in used.

So resubmit the job solve the problem.

But if yor recieve this error each time you run the code,you should check more logs to find the cause.

Upvotes: 1

Related Questions