Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/client.py: 21%
533 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import logging
15from botocore import waiter, xform_name
16from botocore.args import ClientArgsCreator
17from botocore.auth import AUTH_TYPE_MAPS
18from botocore.awsrequest import prepare_request_dict
19from botocore.compress import maybe_compress_request
20from botocore.config import Config
21from botocore.credentials import RefreshableCredentials
22from botocore.discovery import (
23 EndpointDiscoveryHandler,
24 EndpointDiscoveryManager,
25 block_endpoint_discovery_required_operations,
26)
27from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring
28from botocore.exceptions import (
29 DataNotFoundError,
30 InvalidEndpointDiscoveryConfigurationError,
31 OperationNotPageableError,
32 UnknownServiceError,
33 UnknownSignatureVersionError,
34)
35from botocore.history import get_global_history_recorder
36from botocore.hooks import first_non_none_response
37from botocore.httpchecksum import (
38 apply_request_checksum,
39 resolve_checksum_context,
40)
41from botocore.model import ServiceModel
42from botocore.paginate import Paginator
43from botocore.retries import adaptive, standard
44from botocore.useragent import UserAgentString
45from botocore.utils import (
46 CachedProperty,
47 EventbridgeSignerSetter,
48 S3ControlArnParamHandlerv2,
49 S3ExpressIdentityResolver,
50 S3RegionRedirectorv2,
51 ensure_boolean,
52 get_service_module_name,
53)
55# Keep these imported. There's pre-existing code that uses:
56# "from botocore.client import UNSIGNED"
57# "from botocore.client import ClientError"
58# etc.
59from botocore.exceptions import ClientError # noqa
60from botocore.utils import S3ArnParamHandler # noqa
61from botocore.utils import S3ControlArnParamHandler # noqa
62from botocore.utils import S3ControlEndpointSetter # noqa
63from botocore.utils import S3EndpointSetter # noqa
64from botocore.utils import S3RegionRedirector # noqa
65from botocore import UNSIGNED # noqa
68_LEGACY_SIGNATURE_VERSIONS = frozenset(
69 (
70 'v2',
71 'v3',
72 'v3https',
73 'v4',
74 's3',
75 's3v4',
76 )
77)
80logger = logging.getLogger(__name__)
81history_recorder = get_global_history_recorder()
84class ClientCreator:
85 """Creates client objects for a service."""
87 def __init__(
88 self,
89 loader,
90 endpoint_resolver,
91 user_agent,
92 event_emitter,
93 retry_handler_factory,
94 retry_config_translator,
95 response_parser_factory=None,
96 exceptions_factory=None,
97 config_store=None,
98 user_agent_creator=None,
99 ):
100 self._loader = loader
101 self._endpoint_resolver = endpoint_resolver
102 self._user_agent = user_agent
103 self._event_emitter = event_emitter
104 self._retry_handler_factory = retry_handler_factory
105 self._retry_config_translator = retry_config_translator
106 self._response_parser_factory = response_parser_factory
107 self._exceptions_factory = exceptions_factory
108 # TODO: Migrate things away from scoped_config in favor of the
109 # config_store. The config store can pull things from both the scoped
110 # config and environment variables (and potentially more in the
111 # future).
112 self._config_store = config_store
113 self._user_agent_creator = user_agent_creator
115 def create_client(
116 self,
117 service_name,
118 region_name,
119 is_secure=True,
120 endpoint_url=None,
121 verify=None,
122 credentials=None,
123 scoped_config=None,
124 api_version=None,
125 client_config=None,
126 auth_token=None,
127 ):
128 responses = self._event_emitter.emit(
129 'choose-service-name', service_name=service_name
130 )
131 service_name = first_non_none_response(responses, default=service_name)
132 service_model = self._load_service_model(service_name, api_version)
133 try:
134 endpoints_ruleset_data = self._load_service_endpoints_ruleset(
135 service_name, api_version
136 )
137 partition_data = self._loader.load_data('partitions')
138 except UnknownServiceError:
139 endpoints_ruleset_data = None
140 partition_data = None
141 logger.info(
142 'No endpoints ruleset found for service %s, falling back to '
143 'legacy endpoint routing.',
144 service_name,
145 )
147 cls = self._create_client_class(service_name, service_model)
148 region_name, client_config = self._normalize_fips_region(
149 region_name, client_config
150 )
151 endpoint_bridge = ClientEndpointBridge(
152 self._endpoint_resolver,
153 scoped_config,
154 client_config,
155 service_signing_name=service_model.metadata.get('signingName'),
156 config_store=self._config_store,
157 service_signature_version=service_model.metadata.get(
158 'signatureVersion'
159 ),
160 )
161 client_args = self._get_client_args(
162 service_model,
163 region_name,
164 is_secure,
165 endpoint_url,
166 verify,
167 credentials,
168 scoped_config,
169 client_config,
170 endpoint_bridge,
171 auth_token,
172 endpoints_ruleset_data,
173 partition_data,
174 )
175 service_client = cls(**client_args)
176 self._register_retries(service_client)
177 self._register_s3_events(
178 client=service_client,
179 endpoint_bridge=None,
180 endpoint_url=None,
181 client_config=client_config,
182 scoped_config=scoped_config,
183 )
184 self._register_s3express_events(client=service_client)
185 self._register_s3_control_events(client=service_client)
186 self._register_endpoint_discovery(
187 service_client, endpoint_url, client_config
188 )
189 return service_client
191 def create_client_class(self, service_name, api_version=None):
192 service_model = self._load_service_model(service_name, api_version)
193 return self._create_client_class(service_name, service_model)
195 def _create_client_class(self, service_name, service_model):
196 class_attributes = self._create_methods(service_model)
197 py_name_to_operation_name = self._create_name_mapping(service_model)
198 class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name
199 bases = [BaseClient]
200 service_id = service_model.service_id.hyphenize()
201 self._event_emitter.emit(
202 'creating-client-class.%s' % service_id,
203 class_attributes=class_attributes,
204 base_classes=bases,
205 )
206 class_name = get_service_module_name(service_model)
207 cls = type(str(class_name), tuple(bases), class_attributes)
208 return cls
210 def _normalize_fips_region(self, region_name, client_config):
211 if region_name is not None:
212 normalized_region_name = region_name.replace('fips-', '').replace(
213 '-fips', ''
214 )
215 # If region has been transformed then set flag
216 if normalized_region_name != region_name:
217 config_use_fips_endpoint = Config(use_fips_endpoint=True)
218 if client_config:
219 # Keeping endpoint setting client specific
220 client_config = client_config.merge(
221 config_use_fips_endpoint
222 )
223 else:
224 client_config = config_use_fips_endpoint
225 logger.warning(
226 'transforming region from %s to %s and setting '
227 'use_fips_endpoint to true. client should not '
228 'be configured with a fips psuedo region.'
229 % (region_name, normalized_region_name)
230 )
231 region_name = normalized_region_name
232 return region_name, client_config
234 def _load_service_model(self, service_name, api_version=None):
235 json_model = self._loader.load_service_model(
236 service_name, 'service-2', api_version=api_version
237 )
238 service_model = ServiceModel(json_model, service_name=service_name)
239 return service_model
241 def _load_service_endpoints_ruleset(self, service_name, api_version=None):
242 return self._loader.load_service_model(
243 service_name, 'endpoint-rule-set-1', api_version=api_version
244 )
246 def _register_retries(self, client):
247 retry_mode = client.meta.config.retries['mode']
248 if retry_mode == 'standard':
249 self._register_v2_standard_retries(client)
250 elif retry_mode == 'adaptive':
251 self._register_v2_standard_retries(client)
252 self._register_v2_adaptive_retries(client)
253 elif retry_mode == 'legacy':
254 self._register_legacy_retries(client)
256 def _register_v2_standard_retries(self, client):
257 max_attempts = client.meta.config.retries.get('total_max_attempts')
258 kwargs = {'client': client}
259 if max_attempts is not None:
260 kwargs['max_attempts'] = max_attempts
261 standard.register_retry_handler(**kwargs)
263 def _register_v2_adaptive_retries(self, client):
264 adaptive.register_retry_handler(client)
266 def _register_legacy_retries(self, client):
267 endpoint_prefix = client.meta.service_model.endpoint_prefix
268 service_id = client.meta.service_model.service_id
269 service_event_name = service_id.hyphenize()
271 # First, we load the entire retry config for all services,
272 # then pull out just the information we need.
273 original_config = self._loader.load_data('_retry')
274 if not original_config:
275 return
277 retries = self._transform_legacy_retries(client.meta.config.retries)
278 retry_config = self._retry_config_translator.build_retry_config(
279 endpoint_prefix,
280 original_config.get('retry', {}),
281 original_config.get('definitions', {}),
282 retries,
283 )
285 logger.debug(
286 "Registering retry handlers for service: %s",
287 client.meta.service_model.service_name,
288 )
289 handler = self._retry_handler_factory.create_retry_handler(
290 retry_config, endpoint_prefix
291 )
292 unique_id = 'retry-config-%s' % service_event_name
293 client.meta.events.register(
294 f"needs-retry.{service_event_name}", handler, unique_id=unique_id
295 )
297 def _transform_legacy_retries(self, retries):
298 if retries is None:
299 return
300 copied_args = retries.copy()
301 if 'total_max_attempts' in retries:
302 copied_args = retries.copy()
303 copied_args['max_attempts'] = (
304 copied_args.pop('total_max_attempts') - 1
305 )
306 return copied_args
308 def _get_retry_mode(self, client, config_store):
309 client_retries = client.meta.config.retries
310 if (
311 client_retries is not None
312 and client_retries.get('mode') is not None
313 ):
314 return client_retries['mode']
315 return config_store.get_config_variable('retry_mode') or 'legacy'
317 def _register_endpoint_discovery(self, client, endpoint_url, config):
318 if endpoint_url is not None:
319 # Don't register any handlers in the case of a custom endpoint url
320 return
321 # Only attach handlers if the service supports discovery
322 if client.meta.service_model.endpoint_discovery_operation is None:
323 return
324 events = client.meta.events
325 service_id = client.meta.service_model.service_id.hyphenize()
326 enabled = False
327 if config and config.endpoint_discovery_enabled is not None:
328 enabled = config.endpoint_discovery_enabled
329 elif self._config_store:
330 enabled = self._config_store.get_config_variable(
331 'endpoint_discovery_enabled'
332 )
334 enabled = self._normalize_endpoint_discovery_config(enabled)
335 if enabled and self._requires_endpoint_discovery(client, enabled):
336 discover = enabled is True
337 manager = EndpointDiscoveryManager(
338 client, always_discover=discover
339 )
340 handler = EndpointDiscoveryHandler(manager)
341 handler.register(events, service_id)
342 else:
343 events.register(
344 'before-parameter-build',
345 block_endpoint_discovery_required_operations,
346 )
348 def _normalize_endpoint_discovery_config(self, enabled):
349 """Config must either be a boolean-string or string-literal 'auto'"""
350 if isinstance(enabled, str):
351 enabled = enabled.lower().strip()
352 if enabled == 'auto':
353 return enabled
354 elif enabled in ('true', 'false'):
355 return ensure_boolean(enabled)
356 elif isinstance(enabled, bool):
357 return enabled
359 raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled)
361 def _requires_endpoint_discovery(self, client, enabled):
362 if enabled == "auto":
363 return client.meta.service_model.endpoint_discovery_required
364 return enabled
366 def _register_eventbridge_events(
367 self, client, endpoint_bridge, endpoint_url
368 ):
369 if client.meta.service_model.service_name != 'events':
370 return
371 EventbridgeSignerSetter(
372 endpoint_resolver=self._endpoint_resolver,
373 region=client.meta.region_name,
374 endpoint_url=endpoint_url,
375 ).register(client.meta.events)
377 def _register_s3express_events(
378 self,
379 client,
380 endpoint_bridge=None,
381 endpoint_url=None,
382 client_config=None,
383 scoped_config=None,
384 ):
385 if client.meta.service_model.service_name != 's3':
386 return
387 S3ExpressIdentityResolver(client, RefreshableCredentials).register()
389 def _register_s3_events(
390 self,
391 client,
392 endpoint_bridge,
393 endpoint_url,
394 client_config,
395 scoped_config,
396 ):
397 if client.meta.service_model.service_name != 's3':
398 return
399 S3RegionRedirectorv2(None, client).register()
400 self._set_s3_presign_signature_version(
401 client.meta, client_config, scoped_config
402 )
404 def _register_s3_control_events(
405 self,
406 client,
407 endpoint_bridge=None,
408 endpoint_url=None,
409 client_config=None,
410 scoped_config=None,
411 ):
412 if client.meta.service_model.service_name != 's3control':
413 return
414 S3ControlArnParamHandlerv2().register(client.meta.events)
416 def _set_s3_presign_signature_version(
417 self, client_meta, client_config, scoped_config
418 ):
419 # This will return the manually configured signature version, or None
420 # if none was manually set. If a customer manually sets the signature
421 # version, we always want to use what they set.
422 provided_signature_version = _get_configured_signature_version(
423 's3', client_config, scoped_config
424 )
425 if provided_signature_version is not None:
426 return
428 # Check to see if the region is a region that we know about. If we
429 # don't know about a region, then we can safely assume it's a new
430 # region that is sigv4 only, since all new S3 regions only allow sigv4.
431 # The only exception is aws-global. This is a pseudo-region for the
432 # global endpoint, we should respect the signature versions it
433 # supports, which includes v2.
434 regions = self._endpoint_resolver.get_available_endpoints(
435 's3', client_meta.partition
436 )
437 if (
438 client_meta.region_name != 'aws-global'
439 and client_meta.region_name not in regions
440 ):
441 return
443 # If it is a region we know about, we want to default to sigv2, so here
444 # we check to see if it is available.
445 endpoint = self._endpoint_resolver.construct_endpoint(
446 's3', client_meta.region_name
447 )
448 signature_versions = endpoint['signatureVersions']
449 if 's3' not in signature_versions:
450 return
452 # We now know that we're in a known region that supports sigv2 and
453 # the customer hasn't set a signature version so we default the
454 # signature version to sigv2.
455 client_meta.events.register(
456 'choose-signer.s3', self._default_s3_presign_to_sigv2
457 )
459 def _default_s3_presign_to_sigv2(self, signature_version, **kwargs):
460 """
461 Returns the 's3' (sigv2) signer if presigning an s3 request. This is
462 intended to be used to set the default signature version for the signer
463 to sigv2. Situations where an asymmetric signature is required are the
464 exception, for example MRAP needs v4a.
466 :type signature_version: str
467 :param signature_version: The current client signature version.
469 :type signing_name: str
470 :param signing_name: The signing name of the service.
472 :return: 's3' if the request is an s3 presign request, None otherwise
473 """
474 if signature_version.startswith('v4a'):
475 return
477 if signature_version.startswith('v4-s3express'):
478 return f'{signature_version}'
480 for suffix in ['-query', '-presign-post']:
481 if signature_version.endswith(suffix):
482 return f's3{suffix}'
484 def _get_client_args(
485 self,
486 service_model,
487 region_name,
488 is_secure,
489 endpoint_url,
490 verify,
491 credentials,
492 scoped_config,
493 client_config,
494 endpoint_bridge,
495 auth_token,
496 endpoints_ruleset_data,
497 partition_data,
498 ):
499 args_creator = ClientArgsCreator(
500 self._event_emitter,
501 self._user_agent,
502 self._response_parser_factory,
503 self._loader,
504 self._exceptions_factory,
505 config_store=self._config_store,
506 user_agent_creator=self._user_agent_creator,
507 )
508 return args_creator.get_client_args(
509 service_model,
510 region_name,
511 is_secure,
512 endpoint_url,
513 verify,
514 credentials,
515 scoped_config,
516 client_config,
517 endpoint_bridge,
518 auth_token,
519 endpoints_ruleset_data,
520 partition_data,
521 )
523 def _create_methods(self, service_model):
524 op_dict = {}
525 for operation_name in service_model.operation_names:
526 py_operation_name = xform_name(operation_name)
527 op_dict[py_operation_name] = self._create_api_method(
528 py_operation_name, operation_name, service_model
529 )
530 return op_dict
532 def _create_name_mapping(self, service_model):
533 # py_name -> OperationName, for every operation available
534 # for a service.
535 mapping = {}
536 for operation_name in service_model.operation_names:
537 py_operation_name = xform_name(operation_name)
538 mapping[py_operation_name] = operation_name
539 return mapping
541 def _create_api_method(
542 self, py_operation_name, operation_name, service_model
543 ):
544 def _api_call(self, *args, **kwargs):
545 # We're accepting *args so that we can give a more helpful
546 # error message than TypeError: _api_call takes exactly
547 # 1 argument.
548 if args:
549 raise TypeError(
550 f"{py_operation_name}() only accepts keyword arguments."
551 )
552 # The "self" in this scope is referring to the BaseClient.
553 return self._make_api_call(operation_name, kwargs)
555 _api_call.__name__ = str(py_operation_name)
557 # Add the docstring to the client method
558 operation_model = service_model.operation_model(operation_name)
559 docstring = ClientMethodDocstring(
560 operation_model=operation_model,
561 method_name=operation_name,
562 event_emitter=self._event_emitter,
563 method_description=operation_model.documentation,
564 example_prefix='response = client.%s' % py_operation_name,
565 include_signature=False,
566 )
567 _api_call.__doc__ = docstring
568 return _api_call
571class ClientEndpointBridge:
572 """Bridges endpoint data and client creation
574 This class handles taking out the relevant arguments from the endpoint
575 resolver and determining which values to use, taking into account any
576 client configuration options and scope configuration options.
578 This class also handles determining what, if any, region to use if no
579 explicit region setting is provided. For example, Amazon S3 client will
580 utilize "us-east-1" by default if no region can be resolved."""
582 DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com'
583 _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control']
585 def __init__(
586 self,
587 endpoint_resolver,
588 scoped_config=None,
589 client_config=None,
590 default_endpoint=None,
591 service_signing_name=None,
592 config_store=None,
593 service_signature_version=None,
594 ):
595 self.service_signing_name = service_signing_name
596 self.endpoint_resolver = endpoint_resolver
597 self.scoped_config = scoped_config
598 self.client_config = client_config
599 self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT
600 self.config_store = config_store
601 self.service_signature_version = service_signature_version
603 def resolve(
604 self, service_name, region_name=None, endpoint_url=None, is_secure=True
605 ):
606 region_name = self._check_default_region(service_name, region_name)
607 use_dualstack_endpoint = self._resolve_use_dualstack_endpoint(
608 service_name
609 )
610 use_fips_endpoint = self._resolve_endpoint_variant_config_var(
611 'use_fips_endpoint'
612 )
613 resolved = self.endpoint_resolver.construct_endpoint(
614 service_name,
615 region_name,
616 use_dualstack_endpoint=use_dualstack_endpoint,
617 use_fips_endpoint=use_fips_endpoint,
618 )
620 # If we can't resolve the region, we'll attempt to get a global
621 # endpoint for non-regionalized services (iam, route53, etc)
622 if not resolved:
623 # TODO: fallback partition_name should be configurable in the
624 # future for users to define as needed.
625 resolved = self.endpoint_resolver.construct_endpoint(
626 service_name,
627 region_name,
628 partition_name='aws',
629 use_dualstack_endpoint=use_dualstack_endpoint,
630 use_fips_endpoint=use_fips_endpoint,
631 )
633 if resolved:
634 return self._create_endpoint(
635 resolved, service_name, region_name, endpoint_url, is_secure
636 )
637 else:
638 return self._assume_endpoint(
639 service_name, region_name, endpoint_url, is_secure
640 )
642 def resolver_uses_builtin_data(self):
643 return self.endpoint_resolver.uses_builtin_data
645 def _check_default_region(self, service_name, region_name):
646 if region_name is not None:
647 return region_name
648 # Use the client_config region if no explicit region was provided.
649 if self.client_config and self.client_config.region_name is not None:
650 return self.client_config.region_name
652 def _create_endpoint(
653 self, resolved, service_name, region_name, endpoint_url, is_secure
654 ):
655 region_name, signing_region = self._pick_region_values(
656 resolved, region_name, endpoint_url
657 )
658 if endpoint_url is None:
659 endpoint_url = self._make_url(
660 resolved.get('hostname'),
661 is_secure,
662 resolved.get('protocols', []),
663 )
664 signature_version = self._resolve_signature_version(
665 service_name, resolved
666 )
667 signing_name = self._resolve_signing_name(service_name, resolved)
668 return self._create_result(
669 service_name=service_name,
670 region_name=region_name,
671 signing_region=signing_region,
672 signing_name=signing_name,
673 endpoint_url=endpoint_url,
674 metadata=resolved,
675 signature_version=signature_version,
676 )
678 def _resolve_endpoint_variant_config_var(self, config_var):
679 client_config = self.client_config
680 config_val = False
682 # Client configuration arg has precedence
683 if client_config and getattr(client_config, config_var) is not None:
684 return getattr(client_config, config_var)
685 elif self.config_store is not None:
686 # Check config store
687 config_val = self.config_store.get_config_variable(config_var)
688 return config_val
690 def _resolve_use_dualstack_endpoint(self, service_name):
691 s3_dualstack_mode = self._is_s3_dualstack_mode(service_name)
692 if s3_dualstack_mode is not None:
693 return s3_dualstack_mode
694 return self._resolve_endpoint_variant_config_var(
695 'use_dualstack_endpoint'
696 )
698 def _is_s3_dualstack_mode(self, service_name):
699 if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES:
700 return None
701 # TODO: This normalization logic is duplicated from the
702 # ClientArgsCreator class. Consolidate everything to
703 # ClientArgsCreator. _resolve_signature_version also has similarly
704 # duplicated logic.
705 client_config = self.client_config
706 if (
707 client_config is not None
708 and client_config.s3 is not None
709 and 'use_dualstack_endpoint' in client_config.s3
710 ):
711 # Client config trumps scoped config.
712 return client_config.s3['use_dualstack_endpoint']
713 if self.scoped_config is not None:
714 enabled = self.scoped_config.get('s3', {}).get(
715 'use_dualstack_endpoint'
716 )
717 if enabled in [True, 'True', 'true']:
718 return True
720 def _assume_endpoint(
721 self, service_name, region_name, endpoint_url, is_secure
722 ):
723 if endpoint_url is None:
724 # Expand the default hostname URI template.
725 hostname = self.default_endpoint.format(
726 service=service_name, region=region_name
727 )
728 endpoint_url = self._make_url(
729 hostname, is_secure, ['http', 'https']
730 )
731 logger.debug(
732 f'Assuming an endpoint for {service_name}, {region_name}: {endpoint_url}'
733 )
734 # We still want to allow the user to provide an explicit version.
735 signature_version = self._resolve_signature_version(
736 service_name, {'signatureVersions': ['v4']}
737 )
738 signing_name = self._resolve_signing_name(service_name, resolved={})
739 return self._create_result(
740 service_name=service_name,
741 region_name=region_name,
742 signing_region=region_name,
743 signing_name=signing_name,
744 signature_version=signature_version,
745 endpoint_url=endpoint_url,
746 metadata={},
747 )
749 def _create_result(
750 self,
751 service_name,
752 region_name,
753 signing_region,
754 signing_name,
755 endpoint_url,
756 signature_version,
757 metadata,
758 ):
759 return {
760 'service_name': service_name,
761 'region_name': region_name,
762 'signing_region': signing_region,
763 'signing_name': signing_name,
764 'endpoint_url': endpoint_url,
765 'signature_version': signature_version,
766 'metadata': metadata,
767 }
769 def _make_url(self, hostname, is_secure, supported_protocols):
770 if is_secure and 'https' in supported_protocols:
771 scheme = 'https'
772 else:
773 scheme = 'http'
774 return f'{scheme}://{hostname}'
776 def _resolve_signing_name(self, service_name, resolved):
777 # CredentialScope overrides everything else.
778 if (
779 'credentialScope' in resolved
780 and 'service' in resolved['credentialScope']
781 ):
782 return resolved['credentialScope']['service']
783 # Use the signingName from the model if present.
784 if self.service_signing_name:
785 return self.service_signing_name
786 # Just assume is the same as the service name.
787 return service_name
789 def _pick_region_values(self, resolved, region_name, endpoint_url):
790 signing_region = region_name
791 if endpoint_url is None:
792 # Do not use the region name or signing name from the resolved
793 # endpoint if the user explicitly provides an endpoint_url. This
794 # would happen if we resolve to an endpoint where the service has
795 # a "defaults" section that overrides all endpoint with a single
796 # hostname and credentialScope. This has been the case historically
797 # for how STS has worked. The only way to resolve an STS endpoint
798 # was to provide a region_name and an endpoint_url. In that case,
799 # we would still resolve an endpoint, but we would not use the
800 # resolved endpointName or signingRegion because we want to allow
801 # custom endpoints.
802 region_name = resolved['endpointName']
803 signing_region = region_name
804 if (
805 'credentialScope' in resolved
806 and 'region' in resolved['credentialScope']
807 ):
808 signing_region = resolved['credentialScope']['region']
809 return region_name, signing_region
811 def _resolve_signature_version(self, service_name, resolved):
812 configured_version = _get_configured_signature_version(
813 service_name, self.client_config, self.scoped_config
814 )
815 if configured_version is not None:
816 return configured_version
818 potential_versions = resolved.get('signatureVersions', [])
819 if (
820 self.service_signature_version is not None
821 and self.service_signature_version
822 not in _LEGACY_SIGNATURE_VERSIONS
823 ):
824 # Prefer the service model as most specific
825 # source of truth for new signature versions.
826 potential_versions = [self.service_signature_version]
828 # Pick a signature version from the endpoint metadata if present.
829 if 'signatureVersions' in resolved:
830 if service_name == 's3':
831 return 's3v4'
832 if 'v4' in potential_versions:
833 return 'v4'
834 # Now just iterate over the signature versions in order until we
835 # find the first one that is known to Botocore.
836 for known in potential_versions:
837 if known in AUTH_TYPE_MAPS:
838 return known
839 raise UnknownSignatureVersionError(
840 signature_version=potential_versions
841 )
844class BaseClient:
845 # This is actually reassigned with the py->op_name mapping
846 # when the client creator creates the subclass. This value is used
847 # because calls such as client.get_paginator('list_objects') use the
848 # snake_case name, but we need to know the ListObjects form.
849 # xform_name() does the ListObjects->list_objects conversion, but
850 # we need the reverse mapping here.
851 _PY_TO_OP_NAME = {}
853 def __init__(
854 self,
855 serializer,
856 endpoint,
857 response_parser,
858 event_emitter,
859 request_signer,
860 service_model,
861 loader,
862 client_config,
863 partition,
864 exceptions_factory,
865 endpoint_ruleset_resolver=None,
866 user_agent_creator=None,
867 ):
868 self._serializer = serializer
869 self._endpoint = endpoint
870 self._ruleset_resolver = endpoint_ruleset_resolver
871 self._response_parser = response_parser
872 self._request_signer = request_signer
873 self._cache = {}
874 self._loader = loader
875 self._client_config = client_config
876 self.meta = ClientMeta(
877 event_emitter,
878 self._client_config,
879 endpoint.host,
880 service_model,
881 self._PY_TO_OP_NAME,
882 partition,
883 )
884 self._exceptions_factory = exceptions_factory
885 self._exceptions = None
886 self._user_agent_creator = user_agent_creator
887 if self._user_agent_creator is None:
888 self._user_agent_creator = (
889 UserAgentString.from_environment().with_client_config(
890 self._client_config
891 )
892 )
893 self._register_handlers()
895 def __getattr__(self, item):
896 service_id = self._service_model.service_id.hyphenize()
897 event_name = f'getattr.{service_id}.{item}'
899 handler, event_response = self.meta.events.emit_until_response(
900 event_name, client=self
901 )
903 if event_response is not None:
904 return event_response
906 raise AttributeError(
907 f"'{self.__class__.__name__}' object has no attribute '{item}'"
908 )
910 def close(self):
911 """Closes underlying endpoint connections."""
912 self._endpoint.close()
914 def _register_handlers(self):
915 # Register the handler required to sign requests.
916 service_id = self.meta.service_model.service_id.hyphenize()
917 self.meta.events.register(
918 f"request-created.{service_id}", self._request_signer.handler
919 )
921 @property
922 def _service_model(self):
923 return self.meta.service_model
925 def _make_api_call(self, operation_name, api_params):
926 operation_model = self._service_model.operation_model(operation_name)
927 service_name = self._service_model.service_name
928 history_recorder.record(
929 'API_CALL',
930 {
931 'service': service_name,
932 'operation': operation_name,
933 'params': api_params,
934 },
935 )
936 if operation_model.deprecated:
937 logger.debug(
938 'Warning: %s.%s() is deprecated', service_name, operation_name
939 )
940 request_context = {
941 'client_region': self.meta.region_name,
942 'client_config': self.meta.config,
943 'has_streaming_input': operation_model.has_streaming_input,
944 'auth_type': operation_model.auth_type,
945 }
946 api_params = self._emit_api_params(
947 api_params=api_params,
948 operation_model=operation_model,
949 context=request_context,
950 )
951 (
952 endpoint_url,
953 additional_headers,
954 properties,
955 ) = self._resolve_endpoint_ruleset(
956 operation_model, api_params, request_context
957 )
958 if properties:
959 # Pass arbitrary endpoint info with the Request
960 # for use during construction.
961 request_context['endpoint_properties'] = properties
962 request_dict = self._convert_to_request_dict(
963 api_params=api_params,
964 operation_model=operation_model,
965 endpoint_url=endpoint_url,
966 context=request_context,
967 headers=additional_headers,
968 )
969 resolve_checksum_context(request_dict, operation_model, api_params)
971 service_id = self._service_model.service_id.hyphenize()
972 handler, event_response = self.meta.events.emit_until_response(
973 'before-call.{service_id}.{operation_name}'.format(
974 service_id=service_id, operation_name=operation_name
975 ),
976 model=operation_model,
977 params=request_dict,
978 request_signer=self._request_signer,
979 context=request_context,
980 )
982 if event_response is not None:
983 http, parsed_response = event_response
984 else:
985 maybe_compress_request(
986 self.meta.config, request_dict, operation_model
987 )
988 apply_request_checksum(request_dict)
989 http, parsed_response = self._make_request(
990 operation_model, request_dict, request_context
991 )
993 self.meta.events.emit(
994 'after-call.{service_id}.{operation_name}'.format(
995 service_id=service_id, operation_name=operation_name
996 ),
997 http_response=http,
998 parsed=parsed_response,
999 model=operation_model,
1000 context=request_context,
1001 )
1003 if http.status_code >= 300:
1004 error_info = parsed_response.get("Error", {})
1005 error_code = error_info.get("QueryErrorCode") or error_info.get(
1006 "Code"
1007 )
1008 error_class = self.exceptions.from_code(error_code)
1009 raise error_class(parsed_response, operation_name)
1010 else:
1011 return parsed_response
1013 def _make_request(self, operation_model, request_dict, request_context):
1014 try:
1015 return self._endpoint.make_request(operation_model, request_dict)
1016 except Exception as e:
1017 self.meta.events.emit(
1018 'after-call-error.{service_id}.{operation_name}'.format(
1019 service_id=self._service_model.service_id.hyphenize(),
1020 operation_name=operation_model.name,
1021 ),
1022 exception=e,
1023 context=request_context,
1024 )
1025 raise
1027 def _convert_to_request_dict(
1028 self,
1029 api_params,
1030 operation_model,
1031 endpoint_url,
1032 context=None,
1033 headers=None,
1034 set_user_agent_header=True,
1035 ):
1036 request_dict = self._serializer.serialize_to_request(
1037 api_params, operation_model
1038 )
1039 if not self._client_config.inject_host_prefix:
1040 request_dict.pop('host_prefix', None)
1041 if headers is not None:
1042 request_dict['headers'].update(headers)
1043 if set_user_agent_header:
1044 user_agent = self._user_agent_creator.to_string()
1045 else:
1046 user_agent = None
1047 prepare_request_dict(
1048 request_dict,
1049 endpoint_url=endpoint_url,
1050 user_agent=user_agent,
1051 context=context,
1052 )
1053 return request_dict
1055 def _emit_api_params(self, api_params, operation_model, context):
1056 # Given the API params provided by the user and the operation_model
1057 # we can serialize the request to a request_dict.
1058 operation_name = operation_model.name
1060 # Emit an event that allows users to modify the parameters at the
1061 # beginning of the method. It allows handlers to modify existing
1062 # parameters or return a new set of parameters to use.
1063 service_id = self._service_model.service_id.hyphenize()
1064 responses = self.meta.events.emit(
1065 f'provide-client-params.{service_id}.{operation_name}',
1066 params=api_params,
1067 model=operation_model,
1068 context=context,
1069 )
1070 api_params = first_non_none_response(responses, default=api_params)
1072 self.meta.events.emit(
1073 f'before-parameter-build.{service_id}.{operation_name}',
1074 params=api_params,
1075 model=operation_model,
1076 context=context,
1077 )
1078 return api_params
1080 def _resolve_endpoint_ruleset(
1081 self,
1082 operation_model,
1083 params,
1084 request_context,
1085 ignore_signing_region=False,
1086 ):
1087 """Returns endpoint URL and list of additional headers returned from
1088 EndpointRulesetResolver for the given operation and params. If the
1089 ruleset resolver is not available, for example because the service has
1090 no endpoints ruleset file, the legacy endpoint resolver's value is
1091 returned.
1093 Use ignore_signing_region for generating presigned URLs or any other
1094 situation where the signing region information from the ruleset
1095 resolver should be ignored.
1097 Returns tuple of URL and headers dictionary. Additionally, the
1098 request_context dict is modified in place with any signing information
1099 returned from the ruleset resolver.
1100 """
1101 if self._ruleset_resolver is None:
1102 endpoint_url = self.meta.endpoint_url
1103 additional_headers = {}
1104 endpoint_properties = {}
1105 else:
1106 endpoint_info = self._ruleset_resolver.construct_endpoint(
1107 operation_model=operation_model,
1108 call_args=params,
1109 request_context=request_context,
1110 )
1111 endpoint_url = endpoint_info.url
1112 additional_headers = endpoint_info.headers
1113 endpoint_properties = endpoint_info.properties
1114 # If authSchemes is present, overwrite default auth type and
1115 # signing context derived from service model.
1116 auth_schemes = endpoint_info.properties.get('authSchemes')
1117 if auth_schemes is not None:
1118 auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx(
1119 auth_schemes
1120 )
1121 auth_type, signing_context = auth_info
1122 request_context['auth_type'] = auth_type
1123 if 'region' in signing_context and ignore_signing_region:
1124 del signing_context['region']
1125 if 'signing' in request_context:
1126 request_context['signing'].update(signing_context)
1127 else:
1128 request_context['signing'] = signing_context
1130 return endpoint_url, additional_headers, endpoint_properties
1132 def get_paginator(self, operation_name):
1133 """Create a paginator for an operation.
1135 :type operation_name: string
1136 :param operation_name: The operation name. This is the same name
1137 as the method name on the client. For example, if the
1138 method name is ``create_foo``, and you'd normally invoke the
1139 operation as ``client.create_foo(**kwargs)``, if the
1140 ``create_foo`` operation can be paginated, you can use the
1141 call ``client.get_paginator("create_foo")``.
1143 :raise OperationNotPageableError: Raised if the operation is not
1144 pageable. You can use the ``client.can_paginate`` method to
1145 check if an operation is pageable.
1147 :rtype: L{botocore.paginate.Paginator}
1148 :return: A paginator object.
1150 """
1151 if not self.can_paginate(operation_name):
1152 raise OperationNotPageableError(operation_name=operation_name)
1153 else:
1154 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1156 # Create a new paginate method that will serve as a proxy to
1157 # the underlying Paginator.paginate method. This is needed to
1158 # attach a docstring to the method.
1159 def paginate(self, **kwargs):
1160 return Paginator.paginate(self, **kwargs)
1162 paginator_config = self._cache['page_config'][
1163 actual_operation_name
1164 ]
1165 # Add the docstring for the paginate method.
1166 paginate.__doc__ = PaginatorDocstring(
1167 paginator_name=actual_operation_name,
1168 event_emitter=self.meta.events,
1169 service_model=self.meta.service_model,
1170 paginator_config=paginator_config,
1171 include_signature=False,
1172 )
1174 # Rename the paginator class based on the type of paginator.
1175 service_module_name = get_service_module_name(
1176 self.meta.service_model
1177 )
1178 paginator_class_name = (
1179 f"{service_module_name}.Paginator.{actual_operation_name}"
1180 )
1182 # Create the new paginator class
1183 documented_paginator_cls = type(
1184 paginator_class_name, (Paginator,), {'paginate': paginate}
1185 )
1187 operation_model = self._service_model.operation_model(
1188 actual_operation_name
1189 )
1190 paginator = documented_paginator_cls(
1191 getattr(self, operation_name),
1192 paginator_config,
1193 operation_model,
1194 )
1195 return paginator
1197 def can_paginate(self, operation_name):
1198 """Check if an operation can be paginated.
1200 :type operation_name: string
1201 :param operation_name: The operation name. This is the same name
1202 as the method name on the client. For example, if the
1203 method name is ``create_foo``, and you'd normally invoke the
1204 operation as ``client.create_foo(**kwargs)``, if the
1205 ``create_foo`` operation can be paginated, you can use the
1206 call ``client.get_paginator("create_foo")``.
1208 :return: ``True`` if the operation can be paginated,
1209 ``False`` otherwise.
1211 """
1212 if 'page_config' not in self._cache:
1213 try:
1214 page_config = self._loader.load_service_model(
1215 self._service_model.service_name,
1216 'paginators-1',
1217 self._service_model.api_version,
1218 )['pagination']
1219 self._cache['page_config'] = page_config
1220 except DataNotFoundError:
1221 self._cache['page_config'] = {}
1222 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1223 return actual_operation_name in self._cache['page_config']
1225 def _get_waiter_config(self):
1226 if 'waiter_config' not in self._cache:
1227 try:
1228 waiter_config = self._loader.load_service_model(
1229 self._service_model.service_name,
1230 'waiters-2',
1231 self._service_model.api_version,
1232 )
1233 self._cache['waiter_config'] = waiter_config
1234 except DataNotFoundError:
1235 self._cache['waiter_config'] = {}
1236 return self._cache['waiter_config']
1238 def get_waiter(self, waiter_name):
1239 """Returns an object that can wait for some condition.
1241 :type waiter_name: str
1242 :param waiter_name: The name of the waiter to get. See the waiters
1243 section of the service docs for a list of available waiters.
1245 :returns: The specified waiter object.
1246 :rtype: botocore.waiter.Waiter
1247 """
1248 config = self._get_waiter_config()
1249 if not config:
1250 raise ValueError("Waiter does not exist: %s" % waiter_name)
1251 model = waiter.WaiterModel(config)
1252 mapping = {}
1253 for name in model.waiter_names:
1254 mapping[xform_name(name)] = name
1255 if waiter_name not in mapping:
1256 raise ValueError("Waiter does not exist: %s" % waiter_name)
1258 return waiter.create_waiter_with_client(
1259 mapping[waiter_name], model, self
1260 )
1262 @CachedProperty
1263 def waiter_names(self):
1264 """Returns a list of all available waiters."""
1265 config = self._get_waiter_config()
1266 if not config:
1267 return []
1268 model = waiter.WaiterModel(config)
1269 # Waiter configs is a dict, we just want the waiter names
1270 # which are the keys in the dict.
1271 return [xform_name(name) for name in model.waiter_names]
1273 @property
1274 def exceptions(self):
1275 if self._exceptions is None:
1276 self._exceptions = self._load_exceptions()
1277 return self._exceptions
1279 def _load_exceptions(self):
1280 return self._exceptions_factory.create_client_exceptions(
1281 self._service_model
1282 )
1284 def _get_credentials(self):
1285 """
1286 This private interface is subject to abrupt breaking changes, including
1287 removal, in any botocore release.
1288 """
1289 return self._request_signer._credentials
1292class ClientMeta:
1293 """Holds additional client methods.
1295 This class holds additional information for clients. It exists for
1296 two reasons:
1298 * To give advanced functionality to clients
1299 * To namespace additional client attributes from the operation
1300 names which are mapped to methods at runtime. This avoids
1301 ever running into collisions with operation names.
1303 """
1305 def __init__(
1306 self,
1307 events,
1308 client_config,
1309 endpoint_url,
1310 service_model,
1311 method_to_api_mapping,
1312 partition,
1313 ):
1314 self.events = events
1315 self._client_config = client_config
1316 self._endpoint_url = endpoint_url
1317 self._service_model = service_model
1318 self._method_to_api_mapping = method_to_api_mapping
1319 self._partition = partition
1321 @property
1322 def service_model(self):
1323 return self._service_model
1325 @property
1326 def region_name(self):
1327 return self._client_config.region_name
1329 @property
1330 def endpoint_url(self):
1331 return self._endpoint_url
1333 @property
1334 def config(self):
1335 return self._client_config
1337 @property
1338 def method_to_api_mapping(self):
1339 return self._method_to_api_mapping
1341 @property
1342 def partition(self):
1343 return self._partition
1346def _get_configured_signature_version(
1347 service_name, client_config, scoped_config
1348):
1349 """
1350 Gets the manually configured signature version.
1352 :returns: the customer configured signature version, or None if no
1353 signature version was configured.
1354 """
1355 # Client config overrides everything.
1356 if client_config and client_config.signature_version is not None:
1357 return client_config.signature_version
1359 # Scoped config overrides picking from the endpoint metadata.
1360 if scoped_config is not None:
1361 # A given service may have service specific configuration in the
1362 # config file, so we need to check there as well.
1363 service_config = scoped_config.get(service_name)
1364 if service_config is not None and isinstance(service_config, dict):
1365 version = service_config.get('signature_version')
1366 if version:
1367 logger.debug(
1368 "Switching signature version for service %s "
1369 "to version %s based on config file override.",
1370 service_name,
1371 version,
1372 )
1373 return version
1374 return None