Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/discovery.py: 24%
183 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import logging
14import time
15import weakref
17from botocore import xform_name
18from botocore.exceptions import BotoCoreError, ConnectionError, HTTPClientError
19from botocore.model import OperationNotFoundError
20from botocore.utils import CachedProperty
22logger = logging.getLogger(__name__)
25class EndpointDiscoveryException(BotoCoreError):
26 pass
29class EndpointDiscoveryRequired(EndpointDiscoveryException):
30 """Endpoint Discovery is disabled but is required for this operation."""
32 fmt = 'Endpoint Discovery is not enabled but this operation requires it.'
35class EndpointDiscoveryRefreshFailed(EndpointDiscoveryException):
36 """Endpoint Discovery failed to the refresh the known endpoints."""
38 fmt = 'Endpoint Discovery failed to refresh the required endpoints.'
41def block_endpoint_discovery_required_operations(model, **kwargs):
42 endpoint_discovery = model.endpoint_discovery
43 if endpoint_discovery and endpoint_discovery.get('required'):
44 raise EndpointDiscoveryRequired()
47class EndpointDiscoveryModel:
48 def __init__(self, service_model):
49 self._service_model = service_model
51 @CachedProperty
52 def discovery_operation_name(self):
53 discovery_operation = self._service_model.endpoint_discovery_operation
54 return xform_name(discovery_operation.name)
56 @CachedProperty
57 def discovery_operation_keys(self):
58 discovery_operation = self._service_model.endpoint_discovery_operation
59 keys = []
60 if discovery_operation.input_shape:
61 keys = list(discovery_operation.input_shape.members.keys())
62 return keys
64 def discovery_required_for(self, operation_name):
65 try:
66 operation_model = self._service_model.operation_model(
67 operation_name
68 )
69 return operation_model.endpoint_discovery.get('required', False)
70 except OperationNotFoundError:
71 return False
73 def discovery_operation_kwargs(self, **kwargs):
74 input_keys = self.discovery_operation_keys
75 # Operation and Identifiers are only sent if there are Identifiers
76 if not kwargs.get('Identifiers'):
77 kwargs.pop('Operation', None)
78 kwargs.pop('Identifiers', None)
79 return {k: v for k, v in kwargs.items() if k in input_keys}
81 def gather_identifiers(self, operation, params):
82 return self._gather_ids(operation.input_shape, params)
84 def _gather_ids(self, shape, params, ids=None):
85 # Traverse the input shape and corresponding parameters, gathering
86 # any input fields labeled as an endpoint discovery id
87 if ids is None:
88 ids = {}
89 for member_name, member_shape in shape.members.items():
90 if member_shape.metadata.get('endpointdiscoveryid'):
91 ids[member_name] = params[member_name]
92 elif (
93 member_shape.type_name == 'structure' and member_name in params
94 ):
95 self._gather_ids(member_shape, params[member_name], ids)
96 return ids
99class EndpointDiscoveryManager:
100 def __init__(
101 self, client, cache=None, current_time=None, always_discover=True
102 ):
103 if cache is None:
104 cache = {}
105 self._cache = cache
106 self._failed_attempts = {}
107 if current_time is None:
108 current_time = time.time
109 self._time = current_time
110 self._always_discover = always_discover
112 # This needs to be a weak ref in order to prevent memory leaks on
113 # python 2.6
114 self._client = weakref.proxy(client)
115 self._model = EndpointDiscoveryModel(client.meta.service_model)
117 def _parse_endpoints(self, response):
118 endpoints = response['Endpoints']
119 current_time = self._time()
120 for endpoint in endpoints:
121 cache_time = endpoint.get('CachePeriodInMinutes')
122 endpoint['Expiration'] = current_time + cache_time * 60
123 return endpoints
125 def _cache_item(self, value):
126 if isinstance(value, dict):
127 return tuple(sorted(value.items()))
128 else:
129 return value
131 def _create_cache_key(self, **kwargs):
132 kwargs = self._model.discovery_operation_kwargs(**kwargs)
133 return tuple(self._cache_item(v) for k, v in sorted(kwargs.items()))
135 def gather_identifiers(self, operation, params):
136 return self._model.gather_identifiers(operation, params)
138 def delete_endpoints(self, **kwargs):
139 cache_key = self._create_cache_key(**kwargs)
140 if cache_key in self._cache:
141 del self._cache[cache_key]
143 def _describe_endpoints(self, **kwargs):
144 # This is effectively a proxy to whatever name/kwargs the service
145 # supports for endpoint discovery.
146 kwargs = self._model.discovery_operation_kwargs(**kwargs)
147 operation_name = self._model.discovery_operation_name
148 discovery_operation = getattr(self._client, operation_name)
149 logger.debug('Discovering endpoints with kwargs: %s', kwargs)
150 return discovery_operation(**kwargs)
152 def _get_current_endpoints(self, key):
153 if key not in self._cache:
154 return None
155 now = self._time()
156 return [e for e in self._cache[key] if now < e['Expiration']]
158 def _refresh_current_endpoints(self, **kwargs):
159 cache_key = self._create_cache_key(**kwargs)
160 try:
161 response = self._describe_endpoints(**kwargs)
162 endpoints = self._parse_endpoints(response)
163 self._cache[cache_key] = endpoints
164 self._failed_attempts.pop(cache_key, None)
165 return endpoints
166 except (ConnectionError, HTTPClientError):
167 self._failed_attempts[cache_key] = self._time() + 60
168 return None
170 def _recently_failed(self, cache_key):
171 if cache_key in self._failed_attempts:
172 now = self._time()
173 if now < self._failed_attempts[cache_key]:
174 return True
175 del self._failed_attempts[cache_key]
176 return False
178 def _select_endpoint(self, endpoints):
179 return endpoints[0]['Address']
181 def describe_endpoint(self, **kwargs):
182 operation = kwargs['Operation']
183 discovery_required = self._model.discovery_required_for(operation)
185 if not self._always_discover and not discovery_required:
186 # Discovery set to only run on required operations
187 logger.debug(
188 'Optional discovery disabled. Skipping discovery for Operation: %s'
189 % operation
190 )
191 return None
193 # Get the endpoint for the provided operation and identifiers
194 cache_key = self._create_cache_key(**kwargs)
195 endpoints = self._get_current_endpoints(cache_key)
196 if endpoints:
197 return self._select_endpoint(endpoints)
198 # All known endpoints are stale
199 recently_failed = self._recently_failed(cache_key)
200 if not recently_failed:
201 # We haven't failed to discover recently, go ahead and refresh
202 endpoints = self._refresh_current_endpoints(**kwargs)
203 if endpoints:
204 return self._select_endpoint(endpoints)
205 # Discovery has failed recently, do our best to get an endpoint
206 logger.debug('Endpoint Discovery has failed for: %s', kwargs)
207 stale_entries = self._cache.get(cache_key, None)
208 if stale_entries:
209 # We have stale entries, use those while discovery is failing
210 return self._select_endpoint(stale_entries)
211 if discovery_required:
212 # It looks strange to be checking recently_failed again but,
213 # this informs us as to whether or not we tried to refresh earlier
214 if recently_failed:
215 # Discovery is required and we haven't already refreshed
216 endpoints = self._refresh_current_endpoints(**kwargs)
217 if endpoints:
218 return self._select_endpoint(endpoints)
219 # No endpoints even refresh, raise hard error
220 raise EndpointDiscoveryRefreshFailed()
221 # Discovery is optional, just use the default endpoint for now
222 return None
225class EndpointDiscoveryHandler:
226 def __init__(self, manager):
227 self._manager = manager
229 def register(self, events, service_id):
230 events.register(
231 'before-parameter-build.%s' % service_id, self.gather_identifiers
232 )
233 events.register_first(
234 'request-created.%s' % service_id, self.discover_endpoint
235 )
236 events.register('needs-retry.%s' % service_id, self.handle_retries)
238 def gather_identifiers(self, params, model, context, **kwargs):
239 endpoint_discovery = model.endpoint_discovery
240 # Only continue if the operation supports endpoint discovery
241 if endpoint_discovery is None:
242 return
243 ids = self._manager.gather_identifiers(model, params)
244 context['discovery'] = {'identifiers': ids}
246 def discover_endpoint(self, request, operation_name, **kwargs):
247 ids = request.context.get('discovery', {}).get('identifiers')
248 if ids is None:
249 return
250 endpoint = self._manager.describe_endpoint(
251 Operation=operation_name, Identifiers=ids
252 )
253 if endpoint is None:
254 logger.debug('Failed to discover and inject endpoint')
255 return
256 if not endpoint.startswith('http'):
257 endpoint = 'https://' + endpoint
258 logger.debug('Injecting discovered endpoint: %s', endpoint)
259 request.url = endpoint
261 def handle_retries(self, request_dict, response, operation, **kwargs):
262 if response is None:
263 return None
265 _, response = response
266 status = response.get('ResponseMetadata', {}).get('HTTPStatusCode')
267 error_code = response.get('Error', {}).get('Code')
268 if status != 421 and error_code != 'InvalidEndpointException':
269 return None
271 context = request_dict.get('context', {})
272 ids = context.get('discovery', {}).get('identifiers')
273 if ids is None:
274 return None
276 # Delete the cached endpoints, forcing a refresh on retry
277 # TODO: Improve eviction behavior to only evict the bad endpoint if
278 # there are multiple. This will almost certainly require a lock.
279 self._manager.delete_endpoints(
280 Operation=operation.name, Identifiers=ids
281 )
282 return 0