Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/endpoint.py: 23%
183 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
15import datetime
16import logging
17import os
18import threading
19import time
20import uuid
22from botocore import parsers
23from botocore.awsrequest import create_request_object
24from botocore.exceptions import HTTPClientError
25from botocore.history import get_global_history_recorder
26from botocore.hooks import first_non_none_response
27from botocore.httpchecksum import handle_checksum_body
28from botocore.httpsession import URLLib3Session
29from botocore.response import StreamingBody
30from botocore.utils import (
31 get_environ_proxies,
32 is_valid_endpoint_url,
33 is_valid_ipv6_endpoint_url,
34)
36logger = logging.getLogger(__name__)
37history_recorder = get_global_history_recorder()
38DEFAULT_TIMEOUT = 60
39MAX_POOL_CONNECTIONS = 10
42def convert_to_response_dict(http_response, operation_model):
43 """Convert an HTTP response object to a request dict.
45 This converts the requests library's HTTP response object to
46 a dictionary.
48 :type http_response: botocore.vendored.requests.model.Response
49 :param http_response: The HTTP response from an AWS service request.
51 :rtype: dict
52 :return: A response dictionary which will contain the following keys:
53 * headers (dict)
54 * status_code (int)
55 * body (string or file-like object)
57 """
58 response_dict = {
59 'headers': http_response.headers,
60 'status_code': http_response.status_code,
61 'context': {
62 'operation_name': operation_model.name,
63 },
64 }
65 if response_dict['status_code'] >= 300:
66 response_dict['body'] = http_response.content
67 elif operation_model.has_event_stream_output:
68 response_dict['body'] = http_response.raw
69 elif operation_model.has_streaming_output:
70 length = response_dict['headers'].get('content-length')
71 response_dict['body'] = StreamingBody(http_response.raw, length)
72 else:
73 response_dict['body'] = http_response.content
74 return response_dict
77class Endpoint:
78 """
79 Represents an endpoint for a particular service in a specific
80 region. Only an endpoint can make requests.
82 :ivar service: The Service object that describes this endpoints
83 service.
84 :ivar host: The fully qualified endpoint hostname.
85 :ivar session: The session object.
86 """
88 def __init__(
89 self,
90 host,
91 endpoint_prefix,
92 event_emitter,
93 response_parser_factory=None,
94 http_session=None,
95 ):
96 self._endpoint_prefix = endpoint_prefix
97 self._event_emitter = event_emitter
98 self.host = host
99 self._lock = threading.Lock()
100 if response_parser_factory is None:
101 response_parser_factory = parsers.ResponseParserFactory()
102 self._response_parser_factory = response_parser_factory
103 self.http_session = http_session
104 if self.http_session is None:
105 self.http_session = URLLib3Session()
107 def __repr__(self):
108 return f'{self._endpoint_prefix}({self.host})'
110 def close(self):
111 self.http_session.close()
113 def make_request(self, operation_model, request_dict):
114 logger.debug(
115 "Making request for %s with params: %s",
116 operation_model,
117 request_dict,
118 )
119 return self._send_request(request_dict, operation_model)
121 def create_request(self, params, operation_model=None):
122 request = create_request_object(params)
123 if operation_model:
124 request.stream_output = any(
125 [
126 operation_model.has_streaming_output,
127 operation_model.has_event_stream_output,
128 ]
129 )
130 service_id = operation_model.service_model.service_id.hyphenize()
131 event_name = 'request-created.{service_id}.{op_name}'.format(
132 service_id=service_id, op_name=operation_model.name
133 )
134 self._event_emitter.emit(
135 event_name,
136 request=request,
137 operation_name=operation_model.name,
138 )
139 prepared_request = self.prepare_request(request)
140 return prepared_request
142 def _encode_headers(self, headers):
143 # In place encoding of headers to utf-8 if they are unicode.
144 for key, value in headers.items():
145 if isinstance(value, str):
146 headers[key] = value.encode('utf-8')
148 def prepare_request(self, request):
149 self._encode_headers(request.headers)
150 return request.prepare()
152 def _calculate_ttl(
153 self, response_received_timestamp, date_header, read_timeout
154 ):
155 local_timestamp = datetime.datetime.utcnow()
156 date_conversion = datetime.datetime.strptime(
157 date_header, "%a, %d %b %Y %H:%M:%S %Z"
158 )
159 estimated_skew = date_conversion - response_received_timestamp
160 ttl = (
161 local_timestamp
162 + datetime.timedelta(seconds=read_timeout)
163 + estimated_skew
164 )
165 return ttl.strftime('%Y%m%dT%H%M%SZ')
167 def _set_ttl(self, retries_context, read_timeout, success_response):
168 response_date_header = success_response[0].headers.get('Date')
169 has_streaming_input = retries_context.get('has_streaming_input')
170 if response_date_header and not has_streaming_input:
171 try:
172 response_received_timestamp = datetime.datetime.utcnow()
173 retries_context['ttl'] = self._calculate_ttl(
174 response_received_timestamp,
175 response_date_header,
176 read_timeout,
177 )
178 except Exception:
179 logger.debug(
180 "Exception received when updating retries context with TTL",
181 exc_info=True,
182 )
184 def _update_retries_context(self, context, attempt, success_response=None):
185 retries_context = context.setdefault('retries', {})
186 retries_context['attempt'] = attempt
187 if 'invocation-id' not in retries_context:
188 retries_context['invocation-id'] = str(uuid.uuid4())
190 if success_response:
191 read_timeout = context['client_config'].read_timeout
192 self._set_ttl(retries_context, read_timeout, success_response)
194 def _send_request(self, request_dict, operation_model):
195 attempts = 1
196 context = request_dict['context']
197 self._update_retries_context(context, attempts)
198 request = self.create_request(request_dict, operation_model)
199 success_response, exception = self._get_response(
200 request, operation_model, context
201 )
202 while self._needs_retry(
203 attempts,
204 operation_model,
205 request_dict,
206 success_response,
207 exception,
208 ):
209 attempts += 1
210 self._update_retries_context(context, attempts, success_response)
211 # If there is a stream associated with the request, we need
212 # to reset it before attempting to send the request again.
213 # This will ensure that we resend the entire contents of the
214 # body.
215 request.reset_stream()
216 # Create a new request when retried (including a new signature).
217 request = self.create_request(request_dict, operation_model)
218 success_response, exception = self._get_response(
219 request, operation_model, context
220 )
221 if (
222 success_response is not None
223 and 'ResponseMetadata' in success_response[1]
224 ):
225 # We want to share num retries, not num attempts.
226 total_retries = attempts - 1
227 success_response[1]['ResponseMetadata'][
228 'RetryAttempts'
229 ] = total_retries
230 if exception is not None:
231 raise exception
232 else:
233 return success_response
235 def _get_response(self, request, operation_model, context):
236 # This will return a tuple of (success_response, exception)
237 # and success_response is itself a tuple of
238 # (http_response, parsed_dict).
239 # If an exception occurs then the success_response is None.
240 # If no exception occurs then exception is None.
241 success_response, exception = self._do_get_response(
242 request, operation_model, context
243 )
244 kwargs_to_emit = {
245 'response_dict': None,
246 'parsed_response': None,
247 'context': context,
248 'exception': exception,
249 }
250 if success_response is not None:
251 http_response, parsed_response = success_response
252 kwargs_to_emit['parsed_response'] = parsed_response
253 kwargs_to_emit['response_dict'] = convert_to_response_dict(
254 http_response, operation_model
255 )
256 service_id = operation_model.service_model.service_id.hyphenize()
257 self._event_emitter.emit(
258 f"response-received.{service_id}.{operation_model.name}",
259 **kwargs_to_emit,
260 )
261 return success_response, exception
263 def _do_get_response(self, request, operation_model, context):
264 try:
265 logger.debug("Sending http request: %s", request)
266 history_recorder.record(
267 'HTTP_REQUEST',
268 {
269 'method': request.method,
270 'headers': request.headers,
271 'streaming': operation_model.has_streaming_input,
272 'url': request.url,
273 'body': request.body,
274 },
275 )
276 service_id = operation_model.service_model.service_id.hyphenize()
277 event_name = f"before-send.{service_id}.{operation_model.name}"
278 responses = self._event_emitter.emit(event_name, request=request)
279 http_response = first_non_none_response(responses)
280 if http_response is None:
281 http_response = self._send(request)
282 except HTTPClientError as e:
283 return (None, e)
284 except Exception as e:
285 logger.debug(
286 "Exception received when sending HTTP request.", exc_info=True
287 )
288 return (None, e)
289 # This returns the http_response and the parsed_data.
290 response_dict = convert_to_response_dict(
291 http_response, operation_model
292 )
293 handle_checksum_body(
294 http_response,
295 response_dict,
296 context,
297 operation_model,
298 )
300 http_response_record_dict = response_dict.copy()
301 http_response_record_dict[
302 'streaming'
303 ] = operation_model.has_streaming_output
304 history_recorder.record('HTTP_RESPONSE', http_response_record_dict)
306 protocol = operation_model.metadata['protocol']
307 parser = self._response_parser_factory.create_parser(protocol)
308 parsed_response = parser.parse(
309 response_dict, operation_model.output_shape
310 )
311 # Do a second parsing pass to pick up on any modeled error fields
312 # NOTE: Ideally, we would push this down into the parser classes but
313 # they currently have no reference to the operation or service model
314 # The parsers should probably take the operation model instead of
315 # output shape but we can't change that now
316 if http_response.status_code >= 300:
317 self._add_modeled_error_fields(
318 response_dict,
319 parsed_response,
320 operation_model,
321 parser,
322 )
323 history_recorder.record('PARSED_RESPONSE', parsed_response)
324 return (http_response, parsed_response), None
326 def _add_modeled_error_fields(
327 self,
328 response_dict,
329 parsed_response,
330 operation_model,
331 parser,
332 ):
333 error_code = parsed_response.get("Error", {}).get("Code")
334 if error_code is None:
335 return
336 service_model = operation_model.service_model
337 error_shape = service_model.shape_for_error_code(error_code)
338 if error_shape is None:
339 return
340 modeled_parse = parser.parse(response_dict, error_shape)
341 # TODO: avoid naming conflicts with ResponseMetadata and Error
342 parsed_response.update(modeled_parse)
344 def _needs_retry(
345 self,
346 attempts,
347 operation_model,
348 request_dict,
349 response=None,
350 caught_exception=None,
351 ):
352 service_id = operation_model.service_model.service_id.hyphenize()
353 event_name = f"needs-retry.{service_id}.{operation_model.name}"
354 responses = self._event_emitter.emit(
355 event_name,
356 response=response,
357 endpoint=self,
358 operation=operation_model,
359 attempts=attempts,
360 caught_exception=caught_exception,
361 request_dict=request_dict,
362 )
363 handler_response = first_non_none_response(responses)
364 if handler_response is None:
365 return False
366 else:
367 # Request needs to be retried, and we need to sleep
368 # for the specified number of times.
369 logger.debug(
370 "Response received to retry, sleeping for %s seconds",
371 handler_response,
372 )
373 time.sleep(handler_response)
374 return True
376 def _send(self, request):
377 return self.http_session.send(request)
380class EndpointCreator:
381 def __init__(self, event_emitter):
382 self._event_emitter = event_emitter
384 def create_endpoint(
385 self,
386 service_model,
387 region_name,
388 endpoint_url,
389 verify=None,
390 response_parser_factory=None,
391 timeout=DEFAULT_TIMEOUT,
392 max_pool_connections=MAX_POOL_CONNECTIONS,
393 http_session_cls=URLLib3Session,
394 proxies=None,
395 socket_options=None,
396 client_cert=None,
397 proxies_config=None,
398 ):
399 if not is_valid_endpoint_url(
400 endpoint_url
401 ) and not is_valid_ipv6_endpoint_url(endpoint_url):
402 raise ValueError("Invalid endpoint: %s" % endpoint_url)
404 if proxies is None:
405 proxies = self._get_proxies(endpoint_url)
406 endpoint_prefix = service_model.endpoint_prefix
408 logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout)
409 http_session = http_session_cls(
410 timeout=timeout,
411 proxies=proxies,
412 verify=self._get_verify_value(verify),
413 max_pool_connections=max_pool_connections,
414 socket_options=socket_options,
415 client_cert=client_cert,
416 proxies_config=proxies_config,
417 )
419 return Endpoint(
420 endpoint_url,
421 endpoint_prefix=endpoint_prefix,
422 event_emitter=self._event_emitter,
423 response_parser_factory=response_parser_factory,
424 http_session=http_session,
425 )
427 def _get_proxies(self, url):
428 # We could also support getting proxies from a config file,
429 # but for now proxy support is taken from the environment.
430 return get_environ_proxies(url)
432 def _get_verify_value(self, verify):
433 # This is to account for:
434 # https://github.com/kennethreitz/requests/issues/1436
435 # where we need to honor REQUESTS_CA_BUNDLE because we're creating our
436 # own request objects.
437 # First, if verify is not None, then the user explicitly specified
438 # a value so this automatically wins.
439 if verify is not None:
440 return verify
441 # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to
442 # True if the env var does not exist.
443 return os.environ.get('REQUESTS_CA_BUNDLE', True)