1# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
2# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"). You
5# may not use this file except in compliance with the License. A copy of
6# the License is located at
7#
8# http://aws.amazon.com/apache2.0/
9#
10# or in the "license" file accompanying this file. This file is
11# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
12# ANY KIND, either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14
15import datetime
16import logging
17import os
18import threading
19import time
20import uuid
21
22from botocore import parsers
23from botocore.awsrequest import create_request_object
24from botocore.exceptions import HTTPClientError
25from botocore.history import get_global_history_recorder
26from botocore.hooks import first_non_none_response
27from botocore.httpchecksum import handle_checksum_body
28from botocore.httpsession import URLLib3Session
29from botocore.response import StreamingBody
30from botocore.utils import (
31 get_environ_proxies,
32 is_valid_endpoint_url,
33 is_valid_ipv6_endpoint_url,
34)
35
36logger = logging.getLogger(__name__)
37history_recorder = get_global_history_recorder()
38DEFAULT_TIMEOUT = 60
39MAX_POOL_CONNECTIONS = 10
40
41
42def convert_to_response_dict(http_response, operation_model):
43 """Convert an HTTP response object to a request dict.
44
45 This converts the requests library's HTTP response object to
46 a dictionary.
47
48 :type http_response: botocore.vendored.requests.model.Response
49 :param http_response: The HTTP response from an AWS service request.
50
51 :rtype: dict
52 :return: A response dictionary which will contain the following keys:
53 * headers (dict)
54 * status_code (int)
55 * body (string or file-like object)
56
57 """
58 response_dict = {
59 'headers': http_response.headers,
60 'status_code': http_response.status_code,
61 'context': {
62 'operation_name': operation_model.name,
63 },
64 }
65 if response_dict['status_code'] >= 300:
66 response_dict['body'] = http_response.content
67 elif operation_model.has_event_stream_output:
68 response_dict['body'] = http_response.raw
69 elif operation_model.has_streaming_output:
70 length = response_dict['headers'].get('content-length')
71 response_dict['body'] = StreamingBody(http_response.raw, length)
72 else:
73 response_dict['body'] = http_response.content
74 return response_dict
75
76
77class Endpoint:
78 """
79 Represents an endpoint for a particular service in a specific
80 region. Only an endpoint can make requests.
81
82 :ivar service: The Service object that describes this endpoints
83 service.
84 :ivar host: The fully qualified endpoint hostname.
85 :ivar session: The session object.
86 """
87
88 def __init__(
89 self,
90 host,
91 endpoint_prefix,
92 event_emitter,
93 response_parser_factory=None,
94 http_session=None,
95 ):
96 self._endpoint_prefix = endpoint_prefix
97 self._event_emitter = event_emitter
98 self.host = host
99 self._lock = threading.Lock()
100 if response_parser_factory is None:
101 response_parser_factory = parsers.ResponseParserFactory()
102 self._response_parser_factory = response_parser_factory
103 self.http_session = http_session
104 if self.http_session is None:
105 self.http_session = URLLib3Session()
106
107 def __repr__(self):
108 return f'{self._endpoint_prefix}({self.host})'
109
110 def close(self):
111 self.http_session.close()
112
113 def make_request(self, operation_model, request_dict):
114 logger.debug(
115 "Making request for %s with params: %s",
116 operation_model,
117 request_dict,
118 )
119 return self._send_request(request_dict, operation_model)
120
121 def create_request(self, params, operation_model=None):
122 request = create_request_object(params)
123 if operation_model:
124 request.stream_output = any(
125 [
126 operation_model.has_streaming_output,
127 operation_model.has_event_stream_output,
128 ]
129 )
130 service_id = operation_model.service_model.service_id.hyphenize()
131 event_name = f'request-created.{service_id}.{operation_model.name}'
132 self._event_emitter.emit(
133 event_name,
134 request=request,
135 operation_name=operation_model.name,
136 )
137 prepared_request = self.prepare_request(request)
138 return prepared_request
139
140 def _encode_headers(self, headers):
141 # In place encoding of headers to utf-8 if they are unicode.
142 for key, value in headers.items():
143 if isinstance(value, str):
144 headers[key] = value.encode('utf-8')
145
146 def prepare_request(self, request):
147 self._encode_headers(request.headers)
148 return request.prepare()
149
150 def _calculate_ttl(
151 self, response_received_timestamp, date_header, read_timeout
152 ):
153 local_timestamp = datetime.datetime.utcnow()
154 date_conversion = datetime.datetime.strptime(
155 date_header, "%a, %d %b %Y %H:%M:%S %Z"
156 )
157 estimated_skew = date_conversion - response_received_timestamp
158 ttl = (
159 local_timestamp
160 + datetime.timedelta(seconds=read_timeout)
161 + estimated_skew
162 )
163 return ttl.strftime('%Y%m%dT%H%M%SZ')
164
165 def _set_ttl(self, retries_context, read_timeout, success_response):
166 response_date_header = success_response[0].headers.get('Date')
167 has_streaming_input = retries_context.get('has_streaming_input')
168 if response_date_header and not has_streaming_input:
169 try:
170 response_received_timestamp = datetime.datetime.utcnow()
171 retries_context['ttl'] = self._calculate_ttl(
172 response_received_timestamp,
173 response_date_header,
174 read_timeout,
175 )
176 except Exception:
177 logger.debug(
178 "Exception received when updating retries context with TTL",
179 exc_info=True,
180 )
181
182 def _update_retries_context(self, context, attempt, success_response=None):
183 retries_context = context.setdefault('retries', {})
184 retries_context['attempt'] = attempt
185 if 'invocation-id' not in retries_context:
186 retries_context['invocation-id'] = str(uuid.uuid4())
187
188 if success_response:
189 read_timeout = context['client_config'].read_timeout
190 self._set_ttl(retries_context, read_timeout, success_response)
191
192 def _send_request(self, request_dict, operation_model):
193 attempts = 1
194 context = request_dict['context']
195 self._update_retries_context(context, attempts)
196 request = self.create_request(request_dict, operation_model)
197 success_response, exception = self._get_response(
198 request, operation_model, context
199 )
200 while self._needs_retry(
201 attempts,
202 operation_model,
203 request_dict,
204 success_response,
205 exception,
206 ):
207 attempts += 1
208 self._update_retries_context(context, attempts, success_response)
209 # If there is a stream associated with the request, we need
210 # to reset it before attempting to send the request again.
211 # This will ensure that we resend the entire contents of the
212 # body.
213 request.reset_stream()
214 # Create a new request when retried (including a new signature).
215 request = self.create_request(request_dict, operation_model)
216 success_response, exception = self._get_response(
217 request, operation_model, context
218 )
219 if (
220 success_response is not None
221 and 'ResponseMetadata' in success_response[1]
222 ):
223 # We want to share num retries, not num attempts.
224 total_retries = attempts - 1
225 success_response[1]['ResponseMetadata']['RetryAttempts'] = (
226 total_retries
227 )
228 if exception is not None:
229 raise exception
230 else:
231 return success_response
232
233 def _get_response(self, request, operation_model, context):
234 # This will return a tuple of (success_response, exception)
235 # and success_response is itself a tuple of
236 # (http_response, parsed_dict).
237 # If an exception occurs then the success_response is None.
238 # If no exception occurs then exception is None.
239 success_response, exception = self._do_get_response(
240 request, operation_model, context
241 )
242 kwargs_to_emit = {
243 'response_dict': None,
244 'parsed_response': None,
245 'context': context,
246 'exception': exception,
247 }
248 if success_response is not None:
249 http_response, parsed_response = success_response
250 kwargs_to_emit['parsed_response'] = parsed_response
251 kwargs_to_emit['response_dict'] = convert_to_response_dict(
252 http_response, operation_model
253 )
254 service_id = operation_model.service_model.service_id.hyphenize()
255 self._event_emitter.emit(
256 f"response-received.{service_id}.{operation_model.name}",
257 **kwargs_to_emit,
258 )
259 return success_response, exception
260
261 def _do_get_response(self, request, operation_model, context):
262 try:
263 logger.debug("Sending http request: %s", request)
264 history_recorder.record(
265 'HTTP_REQUEST',
266 {
267 'method': request.method,
268 'headers': request.headers,
269 'streaming': operation_model.has_streaming_input,
270 'url': request.url,
271 'body': request.body,
272 },
273 )
274 service_id = operation_model.service_model.service_id.hyphenize()
275 event_name = f"before-send.{service_id}.{operation_model.name}"
276 responses = self._event_emitter.emit(event_name, request=request)
277 http_response = first_non_none_response(responses)
278 if http_response is None:
279 http_response = self._send(request)
280 except HTTPClientError as e:
281 return (None, e)
282 except Exception as e:
283 logger.debug(
284 "Exception received when sending HTTP request.", exc_info=True
285 )
286 return (None, e)
287 # This returns the http_response and the parsed_data.
288 response_dict = convert_to_response_dict(
289 http_response, operation_model
290 )
291 handle_checksum_body(
292 http_response,
293 response_dict,
294 context,
295 operation_model,
296 )
297
298 http_response_record_dict = response_dict.copy()
299 http_response_record_dict['streaming'] = (
300 operation_model.has_streaming_output
301 )
302 history_recorder.record('HTTP_RESPONSE', http_response_record_dict)
303
304 protocol = operation_model.metadata['protocol']
305 customized_response_dict = {}
306 self._event_emitter.emit(
307 f"before-parse.{service_id}.{operation_model.name}",
308 operation_model=operation_model,
309 response_dict=response_dict,
310 customized_response_dict=customized_response_dict,
311 )
312 parser = self._response_parser_factory.create_parser(protocol)
313 parsed_response = parser.parse(
314 response_dict, operation_model.output_shape
315 )
316 parsed_response.update(customized_response_dict)
317 # Do a second parsing pass to pick up on any modeled error fields
318 # NOTE: Ideally, we would push this down into the parser classes but
319 # they currently have no reference to the operation or service model
320 # The parsers should probably take the operation model instead of
321 # output shape but we can't change that now
322 if http_response.status_code >= 300:
323 self._add_modeled_error_fields(
324 response_dict,
325 parsed_response,
326 operation_model,
327 parser,
328 )
329 history_recorder.record('PARSED_RESPONSE', parsed_response)
330 return (http_response, parsed_response), None
331
332 def _add_modeled_error_fields(
333 self,
334 response_dict,
335 parsed_response,
336 operation_model,
337 parser,
338 ):
339 error_code = parsed_response.get("Error", {}).get("Code")
340 if error_code is None:
341 return
342 service_model = operation_model.service_model
343 error_shape = service_model.shape_for_error_code(error_code)
344 if error_shape is None:
345 return
346 modeled_parse = parser.parse(response_dict, error_shape)
347 # TODO: avoid naming conflicts with ResponseMetadata and Error
348 parsed_response.update(modeled_parse)
349
350 def _needs_retry(
351 self,
352 attempts,
353 operation_model,
354 request_dict,
355 response=None,
356 caught_exception=None,
357 ):
358 service_id = operation_model.service_model.service_id.hyphenize()
359 event_name = f"needs-retry.{service_id}.{operation_model.name}"
360 responses = self._event_emitter.emit(
361 event_name,
362 response=response,
363 endpoint=self,
364 operation=operation_model,
365 attempts=attempts,
366 caught_exception=caught_exception,
367 request_dict=request_dict,
368 )
369 handler_response = first_non_none_response(responses)
370 if handler_response is None:
371 return False
372 else:
373 # Request needs to be retried, and we need to sleep
374 # for the specified number of times.
375 logger.debug(
376 "Response received to retry, sleeping for %s seconds",
377 handler_response,
378 )
379 time.sleep(handler_response)
380 return True
381
382 def _send(self, request):
383 return self.http_session.send(request)
384
385
386class EndpointCreator:
387 def __init__(self, event_emitter):
388 self._event_emitter = event_emitter
389
390 def create_endpoint(
391 self,
392 service_model,
393 region_name,
394 endpoint_url,
395 verify=None,
396 response_parser_factory=None,
397 timeout=DEFAULT_TIMEOUT,
398 max_pool_connections=MAX_POOL_CONNECTIONS,
399 http_session_cls=URLLib3Session,
400 proxies=None,
401 socket_options=None,
402 client_cert=None,
403 proxies_config=None,
404 ):
405 if not is_valid_endpoint_url(
406 endpoint_url
407 ) and not is_valid_ipv6_endpoint_url(endpoint_url):
408 raise ValueError(f"Invalid endpoint: {endpoint_url}")
409
410 if proxies is None:
411 proxies = self._get_proxies(endpoint_url)
412 endpoint_prefix = service_model.endpoint_prefix
413
414 logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout)
415 http_session = http_session_cls(
416 timeout=timeout,
417 proxies=proxies,
418 verify=self._get_verify_value(verify),
419 max_pool_connections=max_pool_connections,
420 socket_options=socket_options,
421 client_cert=client_cert,
422 proxies_config=proxies_config,
423 )
424
425 return Endpoint(
426 endpoint_url,
427 endpoint_prefix=endpoint_prefix,
428 event_emitter=self._event_emitter,
429 response_parser_factory=response_parser_factory,
430 http_session=http_session,
431 )
432
433 def _get_proxies(self, url):
434 # We could also support getting proxies from a config file,
435 # but for now proxy support is taken from the environment.
436 return get_environ_proxies(url)
437
438 def _get_verify_value(self, verify):
439 # This is to account for:
440 # https://github.com/kennethreitz/requests/issues/1436
441 # where we need to honor REQUESTS_CA_BUNDLE because we're creating our
442 # own request objects.
443 # First, if verify is not None, then the user explicitly specified
444 # a value so this automatically wins.
445 if verify is not None:
446 return verify
447 # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to
448 # True if the env var does not exist.
449 return os.environ.get('REQUESTS_CA_BUNDLE', True)