Reputation: 134
I've created an SageMaker Endpoint from a trained DeepAR-Model using following code:
job_name = estimator.latest_training_job.job_name
endpoint_name = sagemaker_session.endpoint_from_job(
job_name=job_name,
initial_instance_count=1,
instance_type="ml.m4.xlarge",
image_uri=image_uri,
role=role
)
Now I want to test my model using a test.json
-Dataset (66.2MB).
I've created that file according to various tutorials/sample-notebooks (same as train.json
, but with prediction-length
-less values.
For that, I've written the following code:
class DeepARPredictor(sagemaker.predictor.Predictor):
def set_prediction_parameters(self, freq, prediction_length):
self.freq = freq
self.prediction_length = prediction_length
def predict(self, ts, num_samples=100, quantiles=["0.1", "0.5", "0.9"]):
prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
req = self.__encode_request(ts, num_samples, quantiles)
res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
return self.__decode_response(res, prediction_times)
def __encode_request(self, ts, num_samples, quantiles):
instances = [{"start": str(ts[k].index[0]), "target": list(ts[k])} for k in range(len(ts))]
configuration = {
"num_samples": num_samples,
"output_types": ["quantiles"],
"quantiles": quantiles,
}
http_request_data = {"instances": instances, "configuration": configuration}
return json.dumps(http_request_data).encode( "utf-8")
def __decode_response(self, response, prediction_times):
response_data = json.loads(response.decode("utf-8"))
list_of_df = []
for k in range(len(prediction_times)):
prediction_index = pd.date_range(
start=prediction_times[k], freq=self.freq, periods=self.prediction_length
)
list_of_df.append(
pd.DataFrame(data=response_data["predictions"][k]["quantiles"], index=prediction_index)
)
return list_of_df
But after running the following block:
predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
predictor.set_prediction_parameters(freq, prediction_length)
list_of_df = predictor.predict(time_series_training)
I've getting a BrokenPipeError:
---------------------------------------------------------------------------
BrokenPipeError Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
676 headers=headers,
--> 677 chunked=chunked,
678 )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
391 else:
--> 392 conn.request(method, url, **httplib_request_kw)
393
~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
1261 """Send a complete request to the server."""
-> 1262 self._send_request(method, url, body, headers, encode_chunked)
1263
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
92 rval = super(AWSConnection, self)._send_request(
---> 93 method, url, body, headers, *args, **kwargs)
94 self._expect_header_set = False
~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
1307 body = _encode(body, 'body')
-> 1308 self.endheaders(body, encode_chunked=encode_chunked)
1309
~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
1256 raise CannotSendHeader()
-> 1257 self._send_output(message_body, encode_chunked=encode_chunked)
1258
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
119 message_body = None
--> 120 self.send(msg)
121 if self._expect_header_set:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
203 return
--> 204 return super(AWSConnection, self).send(str)
205
~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
995 try:
--> 996 self.sock.sendall(data)
997 except TypeError:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
974 while count < amount:
--> 975 v = self.send(byte_view[count:])
976 count += v
~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
943 self.__class__)
--> 944 return self._sslobj.write(data)
945 else:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
641 """
--> 642 return self._sslobj.write(data)
643
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
ProtocolError Traceback (most recent call last)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
319 decode_content=False,
--> 320 chunked=self._chunked(request.headers),
321 )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
726 retries = retries.increment(
--> 727 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
728 )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
378 # Disabled, indicate to re-raise the error.
--> 379 raise six.reraise(type(error), error, _stacktrace)
380
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/packages/six.py in reraise(tp, value, tb)
733 if value.__traceback__ is not tb:
--> 734 raise value.with_traceback(tb)
735 raise value
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
676 headers=headers,
--> 677 chunked=chunked,
678 )
~/anaconda3/envs/python3/lib/python3.6/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
391 else:
--> 392 conn.request(method, url, **httplib_request_kw)
393
~/anaconda3/envs/python3/lib/python3.6/http/client.py in request(self, method, url, body, headers, encode_chunked)
1261 """Send a complete request to the server."""
-> 1262 self._send_request(method, url, body, headers, encode_chunked)
1263
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_request(self, method, url, body, headers, *args, **kwargs)
92 rval = super(AWSConnection, self)._send_request(
---> 93 method, url, body, headers, *args, **kwargs)
94 self._expect_header_set = False
~/anaconda3/envs/python3/lib/python3.6/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
1307 body = _encode(body, 'body')
-> 1308 self.endheaders(body, encode_chunked=encode_chunked)
1309
~/anaconda3/envs/python3/lib/python3.6/http/client.py in endheaders(self, message_body, encode_chunked)
1256 raise CannotSendHeader()
-> 1257 self._send_output(message_body, encode_chunked=encode_chunked)
1258
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in _send_output(self, message_body, *args, **kwargs)
119 message_body = None
--> 120 self.send(msg)
121 if self._expect_header_set:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/awsrequest.py in send(self, str)
203 return
--> 204 return super(AWSConnection, self).send(str)
205
~/anaconda3/envs/python3/lib/python3.6/http/client.py in send(self, data)
995 try:
--> 996 self.sock.sendall(data)
997 except TypeError:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in sendall(self, data, flags)
974 while count < amount:
--> 975 v = self.send(byte_view[count:])
976 count += v
~/anaconda3/envs/python3/lib/python3.6/ssl.py in send(self, data, flags)
943 self.__class__)
--> 944 return self._sslobj.write(data)
945 else:
~/anaconda3/envs/python3/lib/python3.6/ssl.py in write(self, data)
641 """
--> 642 return self._sslobj.write(data)
643
ProtocolError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
During handling of the above exception, another exception occurred:
ConnectionClosedError Traceback (most recent call last)
<ipython-input-14-95dda20e8a70> in <module>
1 predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
2 predictor.set_prediction_parameters(freq, prediction_length)
----> 3 list_of_df = predictor.predict(time_series_training)
<ipython-input-13-a0fbac2b9b07> in predict(self, ts, num_samples, quantiles)
7 prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
8 req = self.__encode_request(ts, num_samples, quantiles)
----> 9 res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": "application/json"})
10 return self.__decode_response(res, prediction_times)
11
~/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker/predictor.py in predict(self, data, initial_args, target_model, target_variant)
123
124 request_args = self._create_request_args(data, initial_args, target_model, target_variant)
--> 125 response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)
126 return self._handle_response(response)
127
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
355 "%s() only accepts keyword arguments." % py_operation_name)
356 # The "self" in this scope is referring to the BaseClient.
--> 357 return self._make_api_call(operation_name, kwargs)
358
359 _api_call.__name__ = str(py_operation_name)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
661 else:
662 http, parsed_response = self._make_request(
--> 663 operation_model, request_dict, request_context)
664
665 self.meta.events.emit(
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py in _make_request(self, operation_model, request_dict, request_context)
680 def _make_request(self, operation_model, request_dict, request_context):
681 try:
--> 682 return self._endpoint.make_request(operation_model, request_dict)
683 except Exception as e:
684 self.meta.events.emit(
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
100 logger.debug("Making request for %s with params: %s",
101 operation_model, request_dict)
--> 102 return self._send_request(request_dict, operation_model)
103
104 def create_request(self, params, operation_model=None):
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
135 request, operation_model, context)
136 while self._needs_retry(attempts, operation_model, request_dict,
--> 137 success_response, exception):
138 attempts += 1
139 # If there is a stream associated with the request, we need
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _needs_retry(self, attempts, operation_model, request_dict, response, caught_exception)
254 event_name, response=response, endpoint=self,
255 operation=operation_model, attempts=attempts,
--> 256 caught_exception=caught_exception, request_dict=request_dict)
257 handler_response = first_non_none_response(responses)
258 if handler_response is None:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
354 def emit(self, event_name, **kwargs):
355 aliased_event_name = self._alias_event_name(event_name)
--> 356 return self._emitter.emit(aliased_event_name, **kwargs)
357
358 def emit_until_response(self, event_name, **kwargs):
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
226 handlers.
227 """
--> 228 return self._emit(event_name, kwargs)
229
230 def emit_until_response(self, event_name, **kwargs):
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
209 for handler in handlers_to_call:
210 logger.debug('Event %s: calling handler %s', event_name, handler)
--> 211 response = handler(**kwargs)
212 responses.append((handler, response))
213 if stop_on_response and response is not None:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempts, response, caught_exception, **kwargs)
181
182 """
--> 183 if self._checker(attempts, response, caught_exception):
184 result = self._action(attempts=attempts)
185 logger.debug("Retry needed, action of: %s", result)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
249 def __call__(self, attempt_number, response, caught_exception):
250 should_retry = self._should_retry(attempt_number, response,
--> 251 caught_exception)
252 if should_retry:
253 if attempt_number >= self._max_attempts:
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _should_retry(self, attempt_number, response, caught_exception)
275 # If we've exceeded the max attempts we just let the exception
276 # propogate if one has occurred.
--> 277 return self._checker(attempt_number, response, caught_exception)
278
279
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
315 for checker in self._checkers:
316 checker_response = checker(attempt_number, response,
--> 317 caught_exception)
318 if checker_response:
319 return checker_response
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in __call__(self, attempt_number, response, caught_exception)
221 elif caught_exception is not None:
222 return self._check_caught_exception(
--> 223 attempt_number, caught_exception)
224 else:
225 raise ValueError("Both response and caught_exception are None.")
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/retryhandler.py in _check_caught_exception(self, attempt_number, caught_exception)
357 # the MaxAttemptsDecorator is not interested in retrying the exception
358 # then this exception just propogates out past the retry code.
--> 359 raise caught_exception
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _do_get_response(self, request, operation_model)
198 http_response = first_non_none_response(responses)
199 if http_response is None:
--> 200 http_response = self._send(request)
201 except HTTPClientError as e:
202 return (None, e)
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/endpoint.py in _send(self, request)
267
268 def _send(self, request):
--> 269 return self.http_session.send(request)
270
271
~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/httpsession.py in send(self, request)
349 error=e,
350 request=request,
--> 351 endpoint_url=request.url
352 )
353 except Exception as e:
ConnectionClosedError: Connection was closed before we received a valid response from endpoint URL
Somebody know's why this happens?
Upvotes: 1
Views: 481
Reputation: 81
I believe that Tarun might on the right path. The BrokenPipeError that you got is thrown when the connection is abruptly closed. See the python docs for BrokenPipeError. The SageMaker endpoint probably drops the connection as soon as you go over the limit of 5MB. I suggest you try a smaller dataset. Also the data you send might get enlarged because of how sagemaker.tensorflow.model.TensorFlowPredictor encodes the data according to this comment on a similar issue.
If that doesn't work I've also seen a couple of people having problems with their networks in general. Specifically firewall/antivirus (for example this comment) or network timeout.
Hope this points you in the right direction.
Upvotes: 2