andi2.2
andi2.2

Reputation: 134

Sagemaker Endpoint BrokenPipeError at DeepAR Prediction

I've created an SageMaker Endpoint from a trained DeepAR-Model using following code:

job_name = estimator.latest_training_job.job_name

endpoint_name = sagemaker_session.endpoint_from_job(
    job_name=job_name,
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    image_uri=image_uri,
    role=role
)

Now I want to test my model using a test.json-Dataset (66.2MB). I've created that file according to various tutorials/sample-notebooks (same as train.json, but with prediction-length-less values.

For that, I've written the following code:

class DeepARPredictor(sagemaker.predictor.Predictor):
    def set_prediction_parameters(self, freq, prediction_length):
        self.freq = freq
        self.prediction_length = prediction_length

    def predict(self, ts, num_samples=100, quantiles=["0.1", "0.5", "0.9"]):
        prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
        req = self.__encode_request(ts, num_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
        return self.__decode_response(res, prediction_times)

    def __encode_request(self, ts, num_samples, quantiles):
        instances = [{"start": str(ts[k].index[0]), "target": list(ts[k])} for k in range(len(ts))]
        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles"],
            "quantiles": quantiles,
        }
        http_request_data = {"instances": instances, "configuration": configuration}
        return json.dumps(http_request_data).encode( "utf-8")

    def __decode_response(self, response, prediction_times):
        response_data = json.loads(response.decode("utf-8"))
        list_of_df = []
        for k in range(len(prediction_times)):
            prediction_index = pd.date_range(
                start=prediction_times[k], freq=self.freq, periods=self.prediction_length
            )
            list_of_df.append(
                pd.DataFrame(data=response_data["predictions"][k]["quantiles"], index=prediction_index)
            )
        return list_of_df

But after running the following block:

predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
predictor.set_prediction_parameters(freq, prediction_length)
list_of_df = predictor.predict(time_series_training)

I've getting a BrokenPipeError:

---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    676                 headers=headers,
--> 677                 chunked=chunked,
    678             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    391         else:
--> 392             conn.request(method, url, **httplib_request_kw)
    393 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
   1261         """Send a complete request to the server."""
-> 1262         self._send_request(method, url, body, headers, encode_chunked)
   1263 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
     92         rval = super(AWSConnection, self)._send_request(
---> 93             method, url, body, headers, *args, **kwargs)
     94         self._expect_header_set = False

~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1307             body = _encode(body, 'body')
-> 1308         self.endheaders(body, encode_chunked=encode_chunked)
   1309 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
   1256             raise CannotSendHeader()
-> 1257         self._send_output(message_body, encode_chunked=encode_chunked)
   1258 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
    119             message_body = None
--> 120         self.send(msg)
    121         if self._expect_header_set:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
    203             return
--> 204         return super(AWSConnection, self).send(str)
    205 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
    995         try:
--> 996             self.sock.sendall(data)
    997         except TypeError:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
    974                 while count < amount:
--> 975                     v = self.send(byte_view[count:])
    976                     count += v

~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
    943                     self.__class__)
--> 944             return self._sslobj.write(data)
    945         else:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
    641         """
--> 642         return self._sslobj.write(data)
    643 

BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

ProtocolError                             Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
    319                 decode_content=False,
--> 320                 chunked=self._chunked(request.headers),
    321             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    726             retries = retries.increment(
--> 727                 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
    728             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
    378             # Disabled, indicate to re-raise the error.
--> 379             raise six.reraise(type(error), error, _stacktrace)
    380 

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/packages/six.py in reraise(tp, value, tb)
    733             if value.__traceback__ is not tb:
--> 734                 raise value.with_traceback(tb)
    735             raise value

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    676                 headers=headers,
--> 677                 chunked=chunked,
    678             )

~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    391         else:
--> 392             conn.request(method, url, **httplib_request_kw)
    393 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
   1261         """Send a complete request to the server."""
-> 1262         self._send_request(method, url, body, headers, encode_chunked)
   1263 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
     92         rval = super(AWSConnection, self)._send_request(
---> 93             method, url, body, headers, *args, **kwargs)
     94         self._expect_header_set = False

~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1307             body = _encode(body, 'body')
-> 1308         self.endheaders(body, encode_chunked=encode_chunked)
   1309 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
   1256             raise CannotSendHeader()
-> 1257         self._send_output(message_body, encode_chunked=encode_chunked)
   1258 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
    119             message_body = None
--> 120         self.send(msg)
    121         if self._expect_header_set:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
    203             return
--> 204         return super(AWSConnection, self).send(str)
    205 

~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
    995         try:
--> 996             self.sock.sendall(data)
    997         except TypeError:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
    974                 while count < amount:
--> 975                     v = self.send(byte_view[count:])
    976                     count += v

~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
    943                     self.__class__)
--> 944             return self._sslobj.write(data)
    945         else:

~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
    641         """
--> 642         return self._sslobj.write(data)
    643 

ProtocolError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))

During handling of the above exception, another exception occurred:

ConnectionClosedError                     Traceback (most recent call last)
<ipython-input-14-95dda20e8a70> in <module>
      1 predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
      2 predictor.set_prediction_parameters(freq, prediction_length)
----> 3 list_of_df = predictor.predict(time_series_training)

<ipython-input-13-a0fbac2b9b07> in predict(self, ts, num_samples, quantiles)
      7         prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
      8         req = self.__encode_request(ts, num_samples, quantiles)
----> 9         res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
     10         return self.__decode_response(res, prediction_times)
     11 

~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/predictor.py in predict(self, data, initial_args, target_model, target_variant)
    123 
    124         request_args = self._create_request_args(data, initial_args, target_model, target_variant)
--> 125         response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)
    126         return self._handle_response(response)
    127 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    355                     "%s() only accepts keyword arguments." % py_operation_name)
    356             # The "self" in this scope is referring to the BaseClient.
--> 357             return self._make_api_call(operation_name, kwargs)
    358 
    359         _api_call.__name__ = str(py_operation_name)

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    661         else:
    662             http, parsed_response = self._make_request(
--> 663                 operation_model, request_dict, request_context)
    664 
    665         self.meta.events.emit(

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_request(self, operation_model, request_dict, request_context)
    680     def _make_request(self, operation_model, request_dict, request_context):
    681         try:
--> 682             return self._endpoint.make_request(operation_model, request_dict)
    683         except Exception as e:
    684             self.meta.events.emit(

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
    100         logger.debug("Making request for %s with params: %s",
    101                      operation_model, request_dict)
--> 102         return self._send_request(request_dict, operation_model)
    103 
    104     def create_request(self, params, operation_model=None):

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
    135             request, operation_model, context)
    136         while self._needs_retry(attempts, operation_model, request_dict,
--> 137                                 success_response, exception):
    138             attempts += 1
    139             # If there is a stream associated with the request, we need

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _needs_retry(self, attempts, operation_model, request_dict, response, caught_exception)
    254             event_name, response=response, endpoint=self,
    255             operation=operation_model, attempts=attempts,
--> 256             caught_exception=caught_exception, request_dict=request_dict)
    257         handler_response = first_non_none_response(responses)
    258         if handler_response is None:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    354     def emit(self, event_name, **kwargs):
    355         aliased_event_name = self._alias_event_name(event_name)
--> 356         return self._emitter.emit(aliased_event_name, **kwargs)
    357 
    358     def emit_until_response(self, event_name, **kwargs):

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    226                  handlers.
    227         """
--> 228         return self._emit(event_name, kwargs)
    229 
    230     def emit_until_response(self, event_name, **kwargs):

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
    209         for handler in handlers_to_call:
    210             logger.debug('Event %s: calling handler %s', event_name, handler)
--> 211             response = handler(**kwargs)
    212             responses.append((handler, response))
    213             if stop_on_response and response is not None:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempts, response, caught_exception, **kwargs)
    181 
    182         """
--> 183         if self._checker(attempts, response, caught_exception):
    184             result = self._action(attempts=attempts)
    185             logger.debug("Retry needed, action of: %s", result)

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    249     def __call__(self, attempt_number, response, caught_exception):
    250         should_retry = self._should_retry(attempt_number, response,
--> 251                                           caught_exception)
    252         if should_retry:
    253             if attempt_number >= self._max_attempts:

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _should_retry(self, attempt_number, response, caught_exception)
    275             # If we've exceeded the max attempts we just let the exception
    276             # propogate if one has occurred.
--> 277             return self._checker(attempt_number, response, caught_exception)
    278 
    279 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    315         for checker in self._checkers:
    316             checker_response = checker(attempt_number, response,
--> 317                                        caught_exception)
    318             if checker_response:
    319                 return checker_response

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
    221         elif caught_exception is not None:
    222             return self._check_caught_exception(
--> 223                 attempt_number, caught_exception)
    224         else:
    225             raise ValueError("Both response and caught_exception are None.")

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _check_caught_exception(self, attempt_number, caught_exception)
    357         # the MaxAttemptsDecorator is not interested in retrying the exception
    358         # then this exception just propogates out past the retry code.
--> 359         raise caught_exception

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _do_get_response(self, request, operation_model)
    198             http_response = first_non_none_response(responses)
    199             if http_response is None:
--> 200                 http_response = self._send(request)
    201         except HTTPClientError as e:
    202             return (None, e)

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send(self, request)
    267 
    268     def _send(self, request):
--> 269         return self.http_session.send(request)
    270 
    271 

~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
    349                 error=e,
    350                 request=request,
--> 351                 endpoint_url=request.url
    352             )
    353         except Exception as e:

ConnectionClosedError: Connection was closed before we received a valid response from endpoint URL

Somebody know's why this happens?

Upvotes: 1

Views: 481

Answers (1)

Mzapp
Mzapp

Reputation: 81

I believe that Tarun might on the right path. The BrokenPipeError that you got is thrown when the connection is abruptly closed. See the python docs for BrokenPipeError. The SageMaker endpoint probably drops the connection as soon as you go over the limit of 5MB. I suggest you try a smaller dataset. Also the data you send might get enlarged because of how sagemaker.tensorflow.model.TensorFlowPredictor encodes the data according to this comment on a similar issue.

If that doesn't work I've also seen a couple of people having problems with their networks in general. Specifically firewall/antivirus (for example this comment) or network timeout.

Hope this points you in the right direction.

Upvotes: 2

Related Questions