1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14
15import datetime
16import logging
17import os
18import threading
19import time
20import uuid
21
22from botocore import parsers
23from botocore.awsrequest import create_request_object
24from botocore.exceptions import HTTPClientError
25from botocore.history import get_global_history_recorder
26from botocore.hooks import first_non_none_response
27from botocore.httpchecksum import handle_checksum_body
28from botocore.httpsession import URLLib3Session
29from botocore.response import StreamingBody
30from botocore.utils import (
31 get_environ_proxies,
32 is_valid_endpoint_url,
33 is_valid_ipv6_endpoint_url,
34)
35
36logger = logging.getLogger(__name__)
37history_recorder = get_global_history_recorder()
38DEFAULT_TIMEOUT = 60
39MAX_POOL_CONNECTIONS = 10
40
41
42def convert_to_response_dict(http_response, operation_model):
43 """Convert an HTTP response object to a request dict.
44
45 This converts the HTTP response object to a dictionary.
46
47 :type http_response: botocore.awsrequest.AWSResponse
48 :param http_response: The HTTP response from an AWS service request.
49
50 :rtype: dict
51 :return: A response dictionary which will contain the following keys:
52 * headers (dict)
53 * status_code (int)
54 * body (string or file-like object)
55
56 """
57 response_dict = {
58 'headers': http_response.headers,
59 'status_code': http_response.status_code,
60 'context': {
61 'operation_name': operation_model.name,
62 },
63 }
64 if response_dict['status_code'] >= 300:
65 response_dict['body'] = http_response.content
66 elif operation_model.has_event_stream_output:
67 response_dict['body'] = http_response.raw
68 elif operation_model.has_streaming_output:
69 length = response_dict['headers'].get('content-length')
70 response_dict['body'] = StreamingBody(http_response.raw, length)
71 else:
72 response_dict['body'] = http_response.content
73 return response_dict
74
75
76class Endpoint:
77 """
78 Represents an endpoint for a particular service in a specific
79 region. Only an endpoint can make requests.
80
81 :ivar service: The Service object that describes this endpoints
82 service.
83 :ivar host: The fully qualified endpoint hostname.
84 :ivar session: The session object.
85 """
86
87 def __init__(
88 self,
89 host,
90 endpoint_prefix,
91 event_emitter,
92 response_parser_factory=None,
93 http_session=None,
94 ):
95 self._endpoint_prefix = endpoint_prefix
96 self._event_emitter = event_emitter
97 self.host = host
98 self._lock = threading.Lock()
99 if response_parser_factory is None:
100 response_parser_factory = parsers.ResponseParserFactory()
101 self._response_parser_factory = response_parser_factory
102 self.http_session = http_session
103 if self.http_session is None:
104 self.http_session = URLLib3Session()
105
106 def __repr__(self):
107 return f'{self._endpoint_prefix}({self.host})'
108
109 def close(self):
110 self.http_session.close()
111
112 def make_request(self, operation_model, request_dict):
113 logger.debug(
114 "Making request for %s with params: %s",
115 operation_model,
116 request_dict,
117 )
118 return self._send_request(request_dict, operation_model)
119
120 def create_request(self, params, operation_model=None):
121 request = create_request_object(params)
122 if operation_model:
123 request.stream_output = any(
124 [
125 operation_model.has_streaming_output,
126 operation_model.has_event_stream_output,
127 ]
128 )
129 service_id = operation_model.service_model.service_id.hyphenize()
130 event_name = f'request-created.{service_id}.{operation_model.name}'
131 self._event_emitter.emit(
132 event_name,
133 request=request,
134 operation_name=operation_model.name,
135 )
136 prepared_request = self.prepare_request(request)
137 return prepared_request
138
139 def _encode_headers(self, headers):
140 # In place encoding of headers to utf-8 if they are unicode.
141 for key, value in headers.items():
142 if isinstance(value, str):
143 headers[key] = value.encode('utf-8')
144
145 def prepare_request(self, request):
146 self._encode_headers(request.headers)
147 return request.prepare()
148
149 def _calculate_ttl(
150 self, response_received_timestamp, date_header, read_timeout
151 ):
152 local_timestamp = datetime.datetime.utcnow()
153 date_conversion = datetime.datetime.strptime(
154 date_header, "%a, %d %b %Y %H:%M:%S %Z"
155 )
156 estimated_skew = date_conversion - response_received_timestamp
157 ttl = (
158 local_timestamp
159 + datetime.timedelta(seconds=read_timeout)
160 + estimated_skew
161 )
162 return ttl.strftime('%Y%m%dT%H%M%SZ')
163
164 def _set_ttl(self, retries_context, read_timeout, success_response):
165 response_date_header = success_response[0].headers.get('Date')
166 has_streaming_input = retries_context.get('has_streaming_input')
167 if response_date_header and not has_streaming_input:
168 try:
169 response_received_timestamp = datetime.datetime.utcnow()
170 retries_context['ttl'] = self._calculate_ttl(
171 response_received_timestamp,
172 response_date_header,
173 read_timeout,
174 )
175 except Exception:
176 logger.debug(
177 "Exception received when updating retries context with TTL",
178 exc_info=True,
179 )
180
181 def _update_retries_context(self, context, attempt, success_response=None):
182 retries_context = context.setdefault('retries', {})
183 retries_context['attempt'] = attempt
184 if 'invocation-id' not in retries_context:
185 retries_context['invocation-id'] = str(uuid.uuid4())
186
187 if success_response:
188 read_timeout = context['client_config'].read_timeout
189 self._set_ttl(retries_context, read_timeout, success_response)
190
191 def _send_request(self, request_dict, operation_model):
192 attempts = 1
193 context = request_dict['context']
194 self._update_retries_context(context, attempts)
195 request = self.create_request(request_dict, operation_model)
196 success_response, exception = self._get_response(
197 request, operation_model, context
198 )
199 while self._needs_retry(
200 attempts,
201 operation_model,
202 request_dict,
203 success_response,
204 exception,
205 ):
206 attempts += 1
207 self._update_retries_context(context, attempts, success_response)
208 # If there is a stream associated with the request, we need
209 # to reset it before attempting to send the request again.
210 # This will ensure that we resend the entire contents of the
211 # body.
212 request.reset_stream()
213 # Create a new request when retried (including a new signature).
214 request = self.create_request(request_dict, operation_model)
215 success_response, exception = self._get_response(
216 request, operation_model, context
217 )
218 if (
219 success_response is not None
220 and 'ResponseMetadata' in success_response[1]
221 ):
222 # We want to share num retries, not num attempts.
223 total_retries = attempts - 1
224 success_response[1]['ResponseMetadata']['RetryAttempts'] = (
225 total_retries
226 )
227 if exception is not None:
228 raise exception
229 else:
230 return success_response
231
232 def _get_response(self, request, operation_model, context):
233 # This will return a tuple of (success_response, exception)
234 # and success_response is itself a tuple of
235 # (http_response, parsed_dict).
236 # If an exception occurs then the success_response is None.
237 # If no exception occurs then exception is None.
238 success_response, exception = self._do_get_response(
239 request, operation_model, context
240 )
241 kwargs_to_emit = {
242 'response_dict': None,
243 'parsed_response': None,
244 'context': context,
245 'exception': exception,
246 }
247 if success_response is not None:
248 http_response, parsed_response = success_response
249 kwargs_to_emit['parsed_response'] = parsed_response
250 kwargs_to_emit['response_dict'] = convert_to_response_dict(
251 http_response, operation_model
252 )
253 service_id = operation_model.service_model.service_id.hyphenize()
254 self._event_emitter.emit(
255 f"response-received.{service_id}.{operation_model.name}",
256 **kwargs_to_emit,
257 )
258 return success_response, exception
259
260 def _do_get_response(self, request, operation_model, context):
261 try:
262 logger.debug("Sending http request: %s", request)
263 history_recorder.record(
264 'HTTP_REQUEST',
265 {
266 'method': request.method,
267 'headers': request.headers,
268 'streaming': operation_model.has_streaming_input,
269 'url': request.url,
270 'body': request.body,
271 },
272 )
273 service_id = operation_model.service_model.service_id.hyphenize()
274 event_name = f"before-send.{service_id}.{operation_model.name}"
275 responses = self._event_emitter.emit(event_name, request=request)
276 http_response = first_non_none_response(responses)
277 if http_response is None:
278 http_response = self._send(request)
279 except HTTPClientError as e:
280 return (None, e)
281 except Exception as e:
282 logger.debug(
283 "Exception received when sending HTTP request.", exc_info=True
284 )
285 return (None, e)
286 # This returns the http_response and the parsed_data.
287 response_dict = convert_to_response_dict(
288 http_response, operation_model
289 )
290 handle_checksum_body(
291 http_response,
292 response_dict,
293 context,
294 operation_model,
295 )
296
297 http_response_record_dict = response_dict.copy()
298 http_response_record_dict['streaming'] = (
299 operation_model.has_streaming_output
300 )
301 history_recorder.record('HTTP_RESPONSE', http_response_record_dict)
302
303 protocol = operation_model.service_model.resolved_protocol
304 customized_response_dict = {}
305 self._event_emitter.emit(
306 f"before-parse.{service_id}.{operation_model.name}",
307 operation_model=operation_model,
308 response_dict=response_dict,
309 customized_response_dict=customized_response_dict,
310 )
311 parser = self._response_parser_factory.create_parser(protocol)
312 parsed_response = parser.parse(
313 response_dict, operation_model.output_shape
314 )
315 parsed_response.update(customized_response_dict)
316 # Do a second parsing pass to pick up on any modeled error fields
317 # NOTE: Ideally, we would push this down into the parser classes but
318 # they currently have no reference to the operation or service model
319 # The parsers should probably take the operation model instead of
320 # output shape but we can't change that now
321 if http_response.status_code >= 300:
322 self._add_modeled_error_fields(
323 response_dict,
324 parsed_response,
325 operation_model,
326 parser,
327 )
328 history_recorder.record('PARSED_RESPONSE', parsed_response)
329 return (http_response, parsed_response), None
330
331 def _add_modeled_error_fields(
332 self,
333 response_dict,
334 parsed_response,
335 operation_model,
336 parser,
337 ):
338 error_code = parsed_response.get("Error", {}).get("Code")
339 if error_code is None:
340 return
341 service_model = operation_model.service_model
342 error_shape = service_model.shape_for_error_code(error_code)
343 if error_shape is None:
344 return
345 modeled_parse = parser.parse(response_dict, error_shape)
346 # TODO: avoid naming conflicts with ResponseMetadata and Error
347 parsed_response.update(modeled_parse)
348
349 def _needs_retry(
350 self,
351 attempts,
352 operation_model,
353 request_dict,
354 response=None,
355 caught_exception=None,
356 ):
357 service_id = operation_model.service_model.service_id.hyphenize()
358 event_name = f"needs-retry.{service_id}.{operation_model.name}"
359 responses = self._event_emitter.emit(
360 event_name,
361 response=response,
362 endpoint=self,
363 operation=operation_model,
364 attempts=attempts,
365 caught_exception=caught_exception,
366 request_dict=request_dict,
367 )
368 handler_response = first_non_none_response(responses)
369 if handler_response is None:
370 return False
371 else:
372 # Request needs to be retried, and we need to sleep
373 # for the specified number of times.
374 logger.debug(
375 "Response received to retry, sleeping for %s seconds",
376 handler_response,
377 )
378 time.sleep(handler_response)
379 return True
380
381 def _send(self, request):
382 return self.http_session.send(request)
383
384
385class EndpointCreator:
386 def __init__(self, event_emitter):
387 self._event_emitter = event_emitter
388
389 def create_endpoint(
390 self,
391 service_model,
392 region_name,
393 endpoint_url,
394 verify=None,
395 response_parser_factory=None,
396 timeout=DEFAULT_TIMEOUT,
397 max_pool_connections=MAX_POOL_CONNECTIONS,
398 http_session_cls=URLLib3Session,
399 proxies=None,
400 socket_options=None,
401 client_cert=None,
402 proxies_config=None,
403 ):
404 if not is_valid_endpoint_url(
405 endpoint_url
406 ) and not is_valid_ipv6_endpoint_url(endpoint_url):
407 raise ValueError(f"Invalid endpoint: {endpoint_url}")
408
409 if proxies is None:
410 proxies = self._get_proxies(endpoint_url)
411 endpoint_prefix = service_model.endpoint_prefix
412
413 logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout)
414 http_session = http_session_cls(
415 timeout=timeout,
416 proxies=proxies,
417 verify=self._get_verify_value(verify),
418 max_pool_connections=max_pool_connections,
419 socket_options=socket_options,
420 client_cert=client_cert,
421 proxies_config=proxies_config,
422 )
423
424 return Endpoint(
425 endpoint_url,
426 endpoint_prefix=endpoint_prefix,
427 event_emitter=self._event_emitter,
428 response_parser_factory=response_parser_factory,
429 http_session=http_session,
430 )
431
432 def _get_proxies(self, url):
433 # We could also support getting proxies from a config file,
434 # but for now proxy support is taken from the environment.
435 return get_environ_proxies(url)
436
437 def _get_verify_value(self, verify):
438 # This is to account for:
439 # https://github.com/kennethreitz/requests/issues/1436
440 # where we need to honor REQUESTS_CA_BUNDLE because we're creating our
441 # own request objects.
442 # First, if verify is not None, then the user explicitly specified
443 # a value so this automatically wins.
444 if verify is not None:
445 return verify
446 # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to
447 # True if the env var does not exist.
448 return os.environ.get('REQUESTS_CA_BUNDLE', True)