Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/client.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import logging
15from botocore import (
16 UNSIGNED, # noqa: F401
17 waiter,
18 xform_name,
19)
20from botocore.args import ClientArgsCreator
21from botocore.auth import AUTH_TYPE_MAPS, resolve_auth_type
22from botocore.awsrequest import prepare_request_dict
23from botocore.compress import maybe_compress_request
24from botocore.config import Config
25from botocore.context import with_current_context
26from botocore.credentials import RefreshableCredentials
27from botocore.discovery import (
28 EndpointDiscoveryHandler,
29 EndpointDiscoveryManager,
30 block_endpoint_discovery_required_operations,
31)
32from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring
33from botocore.exceptions import (
34 ClientError, # noqa: F401
35 DataNotFoundError,
36 InvalidEndpointDiscoveryConfigurationError,
37 OperationNotPageableError,
38 UnknownServiceError,
39 UnknownSignatureVersionError,
40)
41from botocore.history import get_global_history_recorder
42from botocore.hooks import first_non_none_response
43from botocore.httpchecksum import (
44 apply_request_checksum,
45 resolve_checksum_context,
46)
47from botocore.model import ServiceModel
48from botocore.paginate import Paginator
49from botocore.retries import adaptive, standard
50from botocore.useragent import UserAgentString, register_feature_id
51from botocore.utils import (
52 CachedProperty,
53 EventbridgeSignerSetter,
54 S3ArnParamHandler, # noqa: F401
55 S3ControlArnParamHandler, # noqa: F401
56 S3ControlArnParamHandlerv2,
57 S3ControlEndpointSetter, # noqa: F401
58 S3EndpointSetter, # noqa: F401
59 S3ExpressIdentityResolver,
60 S3RegionRedirector, # noqa: F401
61 S3RegionRedirectorv2,
62 ensure_boolean,
63 get_service_module_name,
64)
66logger = logging.getLogger(__name__)
67history_recorder = get_global_history_recorder()
70class ClientCreator:
71 """Creates client objects for a service."""
73 def __init__(
74 self,
75 loader,
76 endpoint_resolver,
77 user_agent,
78 event_emitter,
79 retry_handler_factory,
80 retry_config_translator,
81 response_parser_factory=None,
82 exceptions_factory=None,
83 config_store=None,
84 user_agent_creator=None,
85 auth_token_resolver=None,
86 ):
87 self._loader = loader
88 self._endpoint_resolver = endpoint_resolver
89 self._user_agent = user_agent
90 self._event_emitter = event_emitter
91 self._retry_handler_factory = retry_handler_factory
92 self._retry_config_translator = retry_config_translator
93 self._response_parser_factory = response_parser_factory
94 self._exceptions_factory = exceptions_factory
95 # TODO: Migrate things away from scoped_config in favor of the
96 # config_store. The config store can pull things from both the scoped
97 # config and environment variables (and potentially more in the
98 # future).
99 self._config_store = config_store
100 self._user_agent_creator = user_agent_creator
101 self._auth_token_resolver = auth_token_resolver
103 def create_client(
104 self,
105 service_name,
106 region_name,
107 is_secure=True,
108 endpoint_url=None,
109 verify=None,
110 credentials=None,
111 scoped_config=None,
112 api_version=None,
113 client_config=None,
114 auth_token=None,
115 ):
116 responses = self._event_emitter.emit(
117 'choose-service-name', service_name=service_name
118 )
119 service_name = first_non_none_response(responses, default=service_name)
120 service_model = self._load_service_model(service_name, api_version)
121 try:
122 endpoints_ruleset_data = self._load_service_endpoints_ruleset(
123 service_name, api_version
124 )
125 partition_data = self._loader.load_data('partitions')
126 except UnknownServiceError:
127 endpoints_ruleset_data = None
128 partition_data = None
129 logger.info(
130 'No endpoints ruleset found for service %s, falling back to '
131 'legacy endpoint routing.',
132 service_name,
133 )
135 cls = self._create_client_class(service_name, service_model)
136 region_name, client_config = self._normalize_fips_region(
137 region_name, client_config
138 )
139 if auth := service_model.metadata.get('auth'):
140 service_signature_version = resolve_auth_type(auth)
141 else:
142 service_signature_version = service_model.metadata.get(
143 'signatureVersion'
144 )
145 endpoint_bridge = ClientEndpointBridge(
146 self._endpoint_resolver,
147 scoped_config,
148 client_config,
149 service_signing_name=service_model.metadata.get('signingName'),
150 config_store=self._config_store,
151 service_signature_version=service_signature_version,
152 )
153 if token := self._evaluate_client_specific_token(
154 service_model.signing_name
155 ):
156 auth_token = token
157 client_args = self._get_client_args(
158 service_model,
159 region_name,
160 is_secure,
161 endpoint_url,
162 verify,
163 credentials,
164 scoped_config,
165 client_config,
166 endpoint_bridge,
167 auth_token,
168 endpoints_ruleset_data,
169 partition_data,
170 )
171 service_client = cls(**client_args)
172 self._register_retries(service_client)
173 self._register_s3_events(
174 client=service_client,
175 endpoint_bridge=None,
176 endpoint_url=None,
177 client_config=client_config,
178 scoped_config=scoped_config,
179 )
180 self._register_s3express_events(client=service_client)
181 self._register_s3_control_events(client=service_client)
182 self._register_importexport_events(client=service_client)
183 self._register_endpoint_discovery(
184 service_client, endpoint_url, client_config
185 )
186 return service_client
188 def create_client_class(self, service_name, api_version=None):
189 service_model = self._load_service_model(service_name, api_version)
190 return self._create_client_class(service_name, service_model)
192 def _create_client_class(self, service_name, service_model):
193 class_attributes = self._create_methods(service_model)
194 py_name_to_operation_name = self._create_name_mapping(service_model)
195 class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name
196 bases = [BaseClient]
197 service_id = service_model.service_id.hyphenize()
198 self._event_emitter.emit(
199 f'creating-client-class.{service_id}',
200 class_attributes=class_attributes,
201 base_classes=bases,
202 )
203 class_name = get_service_module_name(service_model)
204 cls = type(str(class_name), tuple(bases), class_attributes)
205 return cls
207 def _normalize_fips_region(self, region_name, client_config):
208 if region_name is not None:
209 normalized_region_name = region_name.replace('fips-', '').replace(
210 '-fips', ''
211 )
212 # If region has been transformed then set flag
213 if normalized_region_name != region_name:
214 config_use_fips_endpoint = Config(use_fips_endpoint=True)
215 if client_config:
216 # Keeping endpoint setting client specific
217 client_config = client_config.merge(
218 config_use_fips_endpoint
219 )
220 else:
221 client_config = config_use_fips_endpoint
222 logger.warning(
223 f'transforming region from {region_name} to '
224 f'{normalized_region_name} and setting '
225 'use_fips_endpoint to true. client should not '
226 'be configured with a fips psuedo region.'
227 )
228 region_name = normalized_region_name
229 return region_name, client_config
231 def _load_service_model(self, service_name, api_version=None):
232 json_model = self._loader.load_service_model(
233 service_name, 'service-2', api_version=api_version
234 )
235 service_model = ServiceModel(json_model, service_name=service_name)
236 return service_model
238 def _load_service_endpoints_ruleset(self, service_name, api_version=None):
239 return self._loader.load_service_model(
240 service_name, 'endpoint-rule-set-1', api_version=api_version
241 )
243 def _register_retries(self, client):
244 retry_mode = client.meta.config.retries['mode']
245 if retry_mode == 'standard':
246 self._register_v2_standard_retries(client)
247 elif retry_mode == 'adaptive':
248 self._register_v2_standard_retries(client)
249 self._register_v2_adaptive_retries(client)
250 elif retry_mode == 'legacy':
251 self._register_legacy_retries(client)
252 else:
253 return
254 register_feature_id(f'RETRY_MODE_{retry_mode.upper()}')
256 def _register_v2_standard_retries(self, client):
257 max_attempts = client.meta.config.retries.get('total_max_attempts')
258 kwargs = {'client': client}
259 if max_attempts is not None:
260 kwargs['max_attempts'] = max_attempts
261 standard.register_retry_handler(**kwargs)
263 def _register_v2_adaptive_retries(self, client):
264 adaptive.register_retry_handler(client)
266 def _register_legacy_retries(self, client):
267 endpoint_prefix = client.meta.service_model.endpoint_prefix
268 service_id = client.meta.service_model.service_id
269 service_event_name = service_id.hyphenize()
271 # First, we load the entire retry config for all services,
272 # then pull out just the information we need.
273 original_config = self._loader.load_data('_retry')
274 if not original_config:
275 return
277 retries = self._transform_legacy_retries(client.meta.config.retries)
278 retry_config = self._retry_config_translator.build_retry_config(
279 endpoint_prefix,
280 original_config.get('retry', {}),
281 original_config.get('definitions', {}),
282 retries,
283 )
285 logger.debug(
286 "Registering retry handlers for service: %s",
287 client.meta.service_model.service_name,
288 )
289 handler = self._retry_handler_factory.create_retry_handler(
290 retry_config, endpoint_prefix
291 )
292 unique_id = f'retry-config-{service_event_name}'
293 client.meta.events.register(
294 f"needs-retry.{service_event_name}", handler, unique_id=unique_id
295 )
297 def _transform_legacy_retries(self, retries):
298 if retries is None:
299 return
300 copied_args = retries.copy()
301 if 'total_max_attempts' in retries:
302 copied_args = retries.copy()
303 copied_args['max_attempts'] = (
304 copied_args.pop('total_max_attempts') - 1
305 )
306 return copied_args
308 def _get_retry_mode(self, client, config_store):
309 client_retries = client.meta.config.retries
310 if (
311 client_retries is not None
312 and client_retries.get('mode') is not None
313 ):
314 return client_retries['mode']
315 return config_store.get_config_variable('retry_mode') or 'legacy'
317 def _register_endpoint_discovery(self, client, endpoint_url, config):
318 if endpoint_url is not None:
319 # Don't register any handlers in the case of a custom endpoint url
320 return
321 # Only attach handlers if the service supports discovery
322 if client.meta.service_model.endpoint_discovery_operation is None:
323 return
324 events = client.meta.events
325 service_id = client.meta.service_model.service_id.hyphenize()
326 enabled = False
327 if config and config.endpoint_discovery_enabled is not None:
328 enabled = config.endpoint_discovery_enabled
329 elif self._config_store:
330 enabled = self._config_store.get_config_variable(
331 'endpoint_discovery_enabled'
332 )
334 enabled = self._normalize_endpoint_discovery_config(enabled)
335 if enabled and self._requires_endpoint_discovery(client, enabled):
336 discover = enabled is True
337 manager = EndpointDiscoveryManager(
338 client, always_discover=discover
339 )
340 handler = EndpointDiscoveryHandler(manager)
341 handler.register(events, service_id)
342 else:
343 events.register(
344 'before-parameter-build',
345 block_endpoint_discovery_required_operations,
346 )
348 def _normalize_endpoint_discovery_config(self, enabled):
349 """Config must either be a boolean-string or string-literal 'auto'"""
350 if isinstance(enabled, str):
351 enabled = enabled.lower().strip()
352 if enabled == 'auto':
353 return enabled
354 elif enabled in ('true', 'false'):
355 return ensure_boolean(enabled)
356 elif isinstance(enabled, bool):
357 return enabled
359 raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled)
361 def _requires_endpoint_discovery(self, client, enabled):
362 if enabled == "auto":
363 return client.meta.service_model.endpoint_discovery_required
364 return enabled
366 def _register_eventbridge_events(
367 self, client, endpoint_bridge, endpoint_url
368 ):
369 if client.meta.service_model.service_name != 'events':
370 return
371 EventbridgeSignerSetter(
372 endpoint_resolver=self._endpoint_resolver,
373 region=client.meta.region_name,
374 endpoint_url=endpoint_url,
375 ).register(client.meta.events)
377 def _register_s3express_events(
378 self,
379 client,
380 endpoint_bridge=None,
381 endpoint_url=None,
382 client_config=None,
383 scoped_config=None,
384 ):
385 if client.meta.service_model.service_name != 's3':
386 return
387 S3ExpressIdentityResolver(client, RefreshableCredentials).register()
389 def _register_s3_events(
390 self,
391 client,
392 endpoint_bridge,
393 endpoint_url,
394 client_config,
395 scoped_config,
396 ):
397 if client.meta.service_model.service_name != 's3':
398 return
399 S3RegionRedirectorv2(None, client).register()
400 self._set_s3_presign_signature_version(
401 client.meta, client_config, scoped_config
402 )
403 client.meta.events.register(
404 'before-parameter-build.s3', self._inject_s3_input_parameters
405 )
407 def _register_s3_control_events(
408 self,
409 client,
410 endpoint_bridge=None,
411 endpoint_url=None,
412 client_config=None,
413 scoped_config=None,
414 ):
415 if client.meta.service_model.service_name != 's3control':
416 return
417 S3ControlArnParamHandlerv2().register(client.meta.events)
419 def _set_s3_presign_signature_version(
420 self, client_meta, client_config, scoped_config
421 ):
422 # This will return the manually configured signature version, or None
423 # if none was manually set. If a customer manually sets the signature
424 # version, we always want to use what they set.
425 provided_signature_version = _get_configured_signature_version(
426 's3', client_config, scoped_config
427 )
428 if provided_signature_version is not None:
429 return
431 # Check to see if the region is a region that we know about. If we
432 # don't know about a region, then we can safely assume it's a new
433 # region that is sigv4 only, since all new S3 regions only allow sigv4.
434 # The only exception is aws-global. This is a pseudo-region for the
435 # global endpoint, we should respect the signature versions it
436 # supports, which includes v2.
437 regions = self._endpoint_resolver.get_available_endpoints(
438 's3', client_meta.partition
439 )
440 if (
441 client_meta.region_name != 'aws-global'
442 and client_meta.region_name not in regions
443 ):
444 return
446 # If it is a region we know about, we want to default to sigv2, so here
447 # we check to see if it is available.
448 endpoint = self._endpoint_resolver.construct_endpoint(
449 's3', client_meta.region_name
450 )
451 signature_versions = endpoint['signatureVersions']
452 if 's3' not in signature_versions:
453 return
455 # We now know that we're in a known region that supports sigv2 and
456 # the customer hasn't set a signature version so we default the
457 # signature version to sigv2.
458 client_meta.events.register(
459 'choose-signer.s3', self._default_s3_presign_to_sigv2
460 )
462 def _inject_s3_input_parameters(self, params, context, **kwargs):
463 context['input_params'] = {}
464 inject_parameters = ('Bucket', 'Delete', 'Key', 'Prefix')
465 for inject_parameter in inject_parameters:
466 if inject_parameter in params:
467 context['input_params'][inject_parameter] = params[
468 inject_parameter
469 ]
471 def _default_s3_presign_to_sigv2(self, signature_version, **kwargs):
472 """
473 Returns the 's3' (sigv2) signer if presigning an s3 request. This is
474 intended to be used to set the default signature version for the signer
475 to sigv2. Situations where an asymmetric signature is required are the
476 exception, for example MRAP needs v4a.
478 :type signature_version: str
479 :param signature_version: The current client signature version.
481 :type signing_name: str
482 :param signing_name: The signing name of the service.
484 :return: 's3' if the request is an s3 presign request, None otherwise
485 """
486 if signature_version.startswith('v4a'):
487 return
489 if signature_version.startswith('v4-s3express'):
490 return signature_version
492 for suffix in ['-query', '-presign-post']:
493 if signature_version.endswith(suffix):
494 return f's3{suffix}'
496 def _register_importexport_events(
497 self,
498 client,
499 endpoint_bridge=None,
500 endpoint_url=None,
501 client_config=None,
502 scoped_config=None,
503 ):
504 if client.meta.service_model.service_name != 'importexport':
505 return
506 self._set_importexport_signature_version(
507 client.meta, client_config, scoped_config
508 )
510 def _set_importexport_signature_version(
511 self, client_meta, client_config, scoped_config
512 ):
513 # This will return the manually configured signature version, or None
514 # if none was manually set. If a customer manually sets the signature
515 # version, we always want to use what they set.
516 configured_signature_version = _get_configured_signature_version(
517 'importexport', client_config, scoped_config
518 )
519 if configured_signature_version is not None:
520 return
522 # importexport has a modeled signatureVersion of v2, but we
523 # previously switched to v4 via endpoint.json before endpoint rulesets.
524 # Override the model's signatureVersion for backwards compatability.
525 client_meta.events.register(
526 'choose-signer.importexport', self._default_signer_to_sigv4
527 )
529 def _default_signer_to_sigv4(self, signature_version, **kwargs):
530 return 'v4'
532 def _get_client_args(
533 self,
534 service_model,
535 region_name,
536 is_secure,
537 endpoint_url,
538 verify,
539 credentials,
540 scoped_config,
541 client_config,
542 endpoint_bridge,
543 auth_token,
544 endpoints_ruleset_data,
545 partition_data,
546 ):
547 args_creator = ClientArgsCreator(
548 self._event_emitter,
549 self._user_agent,
550 self._response_parser_factory,
551 self._loader,
552 self._exceptions_factory,
553 config_store=self._config_store,
554 user_agent_creator=self._user_agent_creator,
555 )
556 return args_creator.get_client_args(
557 service_model,
558 region_name,
559 is_secure,
560 endpoint_url,
561 verify,
562 credentials,
563 scoped_config,
564 client_config,
565 endpoint_bridge,
566 auth_token,
567 endpoints_ruleset_data,
568 partition_data,
569 )
571 def _create_methods(self, service_model):
572 op_dict = {}
573 for operation_name in service_model.operation_names:
574 py_operation_name = xform_name(operation_name)
575 op_dict[py_operation_name] = self._create_api_method(
576 py_operation_name, operation_name, service_model
577 )
578 return op_dict
580 def _create_name_mapping(self, service_model):
581 # py_name -> OperationName, for every operation available
582 # for a service.
583 mapping = {}
584 for operation_name in service_model.operation_names:
585 py_operation_name = xform_name(operation_name)
586 mapping[py_operation_name] = operation_name
587 return mapping
589 def _create_api_method(
590 self, py_operation_name, operation_name, service_model
591 ):
592 def _api_call(self, *args, **kwargs):
593 # We're accepting *args so that we can give a more helpful
594 # error message than TypeError: _api_call takes exactly
595 # 1 argument.
596 if args:
597 raise TypeError(
598 f"{py_operation_name}() only accepts keyword arguments."
599 )
600 # The "self" in this scope is referring to the BaseClient.
601 return self._make_api_call(operation_name, kwargs)
603 _api_call.__name__ = str(py_operation_name)
605 # Add the docstring to the client method
606 operation_model = service_model.operation_model(operation_name)
607 docstring = ClientMethodDocstring(
608 operation_model=operation_model,
609 method_name=operation_name,
610 event_emitter=self._event_emitter,
611 method_description=operation_model.documentation,
612 example_prefix=f'response = client.{py_operation_name}',
613 include_signature=False,
614 )
615 _api_call.__doc__ = docstring
616 return _api_call
618 def _evaluate_client_specific_token(self, signing_name):
619 # Resolves an auth_token for the given signing_name.
620 # Returns None if no resolver is set or if resolution fails.
621 resolver = self._auth_token_resolver
622 if not resolver or not signing_name:
623 return None
625 return resolver(signing_name=signing_name)
628class ClientEndpointBridge:
629 """Bridges endpoint data and client creation
631 This class handles taking out the relevant arguments from the endpoint
632 resolver and determining which values to use, taking into account any
633 client configuration options and scope configuration options.
635 This class also handles determining what, if any, region to use if no
636 explicit region setting is provided. For example, Amazon S3 client will
637 utilize "us-east-1" by default if no region can be resolved."""
639 DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com'
640 _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control']
642 def __init__(
643 self,
644 endpoint_resolver,
645 scoped_config=None,
646 client_config=None,
647 default_endpoint=None,
648 service_signing_name=None,
649 config_store=None,
650 service_signature_version=None,
651 ):
652 self.service_signing_name = service_signing_name
653 self.endpoint_resolver = endpoint_resolver
654 self.scoped_config = scoped_config
655 self.client_config = client_config
656 self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT
657 self.config_store = config_store
658 self.service_signature_version = service_signature_version
660 def resolve(
661 self, service_name, region_name=None, endpoint_url=None, is_secure=True
662 ):
663 region_name = self._check_default_region(service_name, region_name)
664 use_dualstack_endpoint = self._resolve_use_dualstack_endpoint(
665 service_name
666 )
667 use_fips_endpoint = self._resolve_endpoint_variant_config_var(
668 'use_fips_endpoint'
669 )
670 resolved = self.endpoint_resolver.construct_endpoint(
671 service_name,
672 region_name,
673 use_dualstack_endpoint=use_dualstack_endpoint,
674 use_fips_endpoint=use_fips_endpoint,
675 )
677 # If we can't resolve the region, we'll attempt to get a global
678 # endpoint for non-regionalized services (iam, route53, etc)
679 if not resolved:
680 # TODO: fallback partition_name should be configurable in the
681 # future for users to define as needed.
682 resolved = self.endpoint_resolver.construct_endpoint(
683 service_name,
684 region_name,
685 partition_name='aws',
686 use_dualstack_endpoint=use_dualstack_endpoint,
687 use_fips_endpoint=use_fips_endpoint,
688 )
690 if resolved:
691 return self._create_endpoint(
692 resolved, service_name, region_name, endpoint_url, is_secure
693 )
694 else:
695 return self._assume_endpoint(
696 service_name, region_name, endpoint_url, is_secure
697 )
699 def resolver_uses_builtin_data(self):
700 return self.endpoint_resolver.uses_builtin_data
702 def _check_default_region(self, service_name, region_name):
703 if region_name is not None:
704 return region_name
705 # Use the client_config region if no explicit region was provided.
706 if self.client_config and self.client_config.region_name is not None:
707 return self.client_config.region_name
709 def _create_endpoint(
710 self, resolved, service_name, region_name, endpoint_url, is_secure
711 ):
712 region_name, signing_region = self._pick_region_values(
713 resolved, region_name, endpoint_url
714 )
715 if endpoint_url is None:
716 endpoint_url = self._make_url(
717 resolved.get('hostname'),
718 is_secure,
719 resolved.get('protocols', []),
720 )
721 signature_version = self._resolve_signature_version(
722 service_name, resolved
723 )
724 signing_name = self._resolve_signing_name(service_name, resolved)
725 return self._create_result(
726 service_name=service_name,
727 region_name=region_name,
728 signing_region=signing_region,
729 signing_name=signing_name,
730 endpoint_url=endpoint_url,
731 metadata=resolved,
732 signature_version=signature_version,
733 )
735 def _resolve_endpoint_variant_config_var(self, config_var):
736 client_config = self.client_config
737 config_val = False
739 # Client configuration arg has precedence
740 if client_config and getattr(client_config, config_var) is not None:
741 return getattr(client_config, config_var)
742 elif self.config_store is not None:
743 # Check config store
744 config_val = self.config_store.get_config_variable(config_var)
745 return config_val
747 def _resolve_use_dualstack_endpoint(self, service_name):
748 s3_dualstack_mode = self._is_s3_dualstack_mode(service_name)
749 if s3_dualstack_mode is not None:
750 return s3_dualstack_mode
751 return self._resolve_endpoint_variant_config_var(
752 'use_dualstack_endpoint'
753 )
755 def _is_s3_dualstack_mode(self, service_name):
756 if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES:
757 return None
758 # TODO: This normalization logic is duplicated from the
759 # ClientArgsCreator class. Consolidate everything to
760 # ClientArgsCreator. _resolve_signature_version also has similarly
761 # duplicated logic.
762 client_config = self.client_config
763 if (
764 client_config is not None
765 and client_config.s3 is not None
766 and 'use_dualstack_endpoint' in client_config.s3
767 ):
768 # Client config trumps scoped config.
769 return client_config.s3['use_dualstack_endpoint']
770 if self.scoped_config is not None:
771 enabled = self.scoped_config.get('s3', {}).get(
772 'use_dualstack_endpoint'
773 )
774 if enabled in [True, 'True', 'true']:
775 return True
777 def _assume_endpoint(
778 self, service_name, region_name, endpoint_url, is_secure
779 ):
780 if endpoint_url is None:
781 # Expand the default hostname URI template.
782 hostname = self.default_endpoint.format(
783 service=service_name, region=region_name
784 )
785 endpoint_url = self._make_url(
786 hostname, is_secure, ['http', 'https']
787 )
788 logger.debug(
789 f'Assuming an endpoint for {service_name}, {region_name}: {endpoint_url}'
790 )
791 # We still want to allow the user to provide an explicit version.
792 signature_version = self._resolve_signature_version(
793 service_name, {'signatureVersions': ['v4']}
794 )
795 signing_name = self._resolve_signing_name(service_name, resolved={})
796 return self._create_result(
797 service_name=service_name,
798 region_name=region_name,
799 signing_region=region_name,
800 signing_name=signing_name,
801 signature_version=signature_version,
802 endpoint_url=endpoint_url,
803 metadata={},
804 )
806 def _create_result(
807 self,
808 service_name,
809 region_name,
810 signing_region,
811 signing_name,
812 endpoint_url,
813 signature_version,
814 metadata,
815 ):
816 return {
817 'service_name': service_name,
818 'region_name': region_name,
819 'signing_region': signing_region,
820 'signing_name': signing_name,
821 'endpoint_url': endpoint_url,
822 'signature_version': signature_version,
823 'metadata': metadata,
824 }
826 def _make_url(self, hostname, is_secure, supported_protocols):
827 if is_secure and 'https' in supported_protocols:
828 scheme = 'https'
829 else:
830 scheme = 'http'
831 return f'{scheme}://{hostname}'
833 def _resolve_signing_name(self, service_name, resolved):
834 # CredentialScope overrides everything else.
835 if (
836 'credentialScope' in resolved
837 and 'service' in resolved['credentialScope']
838 ):
839 return resolved['credentialScope']['service']
840 # Use the signingName from the model if present.
841 if self.service_signing_name:
842 return self.service_signing_name
843 # Just assume is the same as the service name.
844 return service_name
846 def _pick_region_values(self, resolved, region_name, endpoint_url):
847 signing_region = region_name
848 if endpoint_url is None:
849 # Do not use the region name or signing name from the resolved
850 # endpoint if the user explicitly provides an endpoint_url. This
851 # would happen if we resolve to an endpoint where the service has
852 # a "defaults" section that overrides all endpoint with a single
853 # hostname and credentialScope. This has been the case historically
854 # for how STS has worked. The only way to resolve an STS endpoint
855 # was to provide a region_name and an endpoint_url. In that case,
856 # we would still resolve an endpoint, but we would not use the
857 # resolved endpointName or signingRegion because we want to allow
858 # custom endpoints.
859 region_name = resolved['endpointName']
860 signing_region = region_name
861 if (
862 'credentialScope' in resolved
863 and 'region' in resolved['credentialScope']
864 ):
865 signing_region = resolved['credentialScope']['region']
866 return region_name, signing_region
868 def _resolve_signature_version(self, service_name, resolved):
869 configured_version = _get_configured_signature_version(
870 service_name, self.client_config, self.scoped_config
871 )
872 if configured_version is not None:
873 return configured_version
875 # These have since added the "auth" key to the service model
876 # with "aws.auth#sigv4", but preserve existing behavior from
877 # when we preferred endpoints.json over the service models
878 if service_name in ('s3', 's3-control'):
879 return 's3v4'
881 if self.service_signature_version is not None:
882 # Prefer the service model
883 potential_versions = [self.service_signature_version]
884 else:
885 # Fall back to endpoints.json to preserve existing behavior, which
886 # may be useful for users who have custom service models
887 potential_versions = resolved.get('signatureVersions', [])
888 # This was added for the V2 -> V4 transition,
889 # for services that added V4 after V2 in endpoints.json
890 if 'v4' in potential_versions:
891 return 'v4'
892 # Now just iterate over the signature versions in order until we
893 # find the first one that is known to Botocore.
894 for known in potential_versions:
895 if known in AUTH_TYPE_MAPS:
896 return known
898 raise UnknownSignatureVersionError(
899 signature_version=potential_versions
900 )
903class BaseClient:
904 # This is actually reassigned with the py->op_name mapping
905 # when the client creator creates the subclass. This value is used
906 # because calls such as client.get_paginator('list_objects') use the
907 # snake_case name, but we need to know the ListObjects form.
908 # xform_name() does the ListObjects->list_objects conversion, but
909 # we need the reverse mapping here.
910 _PY_TO_OP_NAME = {}
912 def __init__(
913 self,
914 serializer,
915 endpoint,
916 response_parser,
917 event_emitter,
918 request_signer,
919 service_model,
920 loader,
921 client_config,
922 partition,
923 exceptions_factory,
924 endpoint_ruleset_resolver=None,
925 user_agent_creator=None,
926 ):
927 self._serializer = serializer
928 self._endpoint = endpoint
929 self._ruleset_resolver = endpoint_ruleset_resolver
930 self._response_parser = response_parser
931 self._request_signer = request_signer
932 self._cache = {}
933 self._loader = loader
934 self._client_config = client_config
935 self.meta = ClientMeta(
936 event_emitter,
937 self._client_config,
938 endpoint.host,
939 service_model,
940 self._PY_TO_OP_NAME,
941 partition,
942 )
943 self._exceptions_factory = exceptions_factory
944 self._exceptions = None
945 self._user_agent_creator = user_agent_creator
946 if self._user_agent_creator is None:
947 self._user_agent_creator = (
948 UserAgentString.from_environment().with_client_config(
949 self._client_config
950 )
951 )
952 self._register_handlers()
954 def __getattr__(self, item):
955 service_id = self._service_model.service_id.hyphenize()
956 event_name = f'getattr.{service_id}.{item}'
958 handler, event_response = self.meta.events.emit_until_response(
959 event_name, client=self
960 )
962 if event_response is not None:
963 return event_response
965 raise AttributeError(
966 f"'{self.__class__.__name__}' object has no attribute '{item}'"
967 )
969 def close(self):
970 """Closes underlying endpoint connections."""
971 self._endpoint.close()
973 def _register_handlers(self):
974 # Register the handler required to sign requests.
975 service_id = self.meta.service_model.service_id.hyphenize()
976 self.meta.events.register(
977 f"request-created.{service_id}", self._request_signer.handler
978 )
979 # Rebuild user agent string right before request is sent
980 # to ensure all registered features are included.
981 self.meta.events.register_last(
982 f"request-created.{service_id}",
983 self._user_agent_creator.rebuild_and_replace_user_agent_handler,
984 )
986 @property
987 def _service_model(self):
988 return self.meta.service_model
990 @with_current_context()
991 def _make_api_call(self, operation_name, api_params):
992 operation_model = self._service_model.operation_model(operation_name)
993 service_name = self._service_model.service_name
994 history_recorder.record(
995 'API_CALL',
996 {
997 'service': service_name,
998 'operation': operation_name,
999 'params': api_params,
1000 },
1001 )
1002 if operation_model.deprecated:
1003 logger.debug(
1004 'Warning: %s.%s() is deprecated', service_name, operation_name
1005 )
1006 request_context = {
1007 'client_region': self.meta.region_name,
1008 'client_config': self.meta.config,
1009 'has_streaming_input': operation_model.has_streaming_input,
1010 'auth_type': operation_model.resolved_auth_type,
1011 'unsigned_payload': operation_model.unsigned_payload,
1012 'auth_options': self._service_model.metadata.get('auth'),
1013 }
1015 api_params = self._emit_api_params(
1016 api_params=api_params,
1017 operation_model=operation_model,
1018 context=request_context,
1019 )
1020 (
1021 endpoint_url,
1022 additional_headers,
1023 properties,
1024 ) = self._resolve_endpoint_ruleset(
1025 operation_model, api_params, request_context
1026 )
1027 if properties:
1028 # Pass arbitrary endpoint info with the Request
1029 # for use during construction.
1030 request_context['endpoint_properties'] = properties
1031 request_dict = self._convert_to_request_dict(
1032 api_params=api_params,
1033 operation_model=operation_model,
1034 endpoint_url=endpoint_url,
1035 context=request_context,
1036 headers=additional_headers,
1037 )
1038 resolve_checksum_context(request_dict, operation_model, api_params)
1040 service_id = self._service_model.service_id.hyphenize()
1041 handler, event_response = self.meta.events.emit_until_response(
1042 f'before-call.{service_id}.{operation_name}',
1043 model=operation_model,
1044 params=request_dict,
1045 request_signer=self._request_signer,
1046 context=request_context,
1047 )
1049 if event_response is not None:
1050 http, parsed_response = event_response
1051 else:
1052 maybe_compress_request(
1053 self.meta.config, request_dict, operation_model
1054 )
1055 apply_request_checksum(request_dict)
1056 http, parsed_response = self._make_request(
1057 operation_model, request_dict, request_context
1058 )
1060 self.meta.events.emit(
1061 f'after-call.{service_id}.{operation_name}',
1062 http_response=http,
1063 parsed=parsed_response,
1064 model=operation_model,
1065 context=request_context,
1066 )
1068 if http.status_code >= 300:
1069 error_info = parsed_response.get("Error", {})
1070 error_code = error_info.get("QueryErrorCode") or error_info.get(
1071 "Code"
1072 )
1073 error_class = self.exceptions.from_code(error_code)
1074 raise error_class(parsed_response, operation_name)
1075 else:
1076 return parsed_response
1078 def _make_request(self, operation_model, request_dict, request_context):
1079 try:
1080 return self._endpoint.make_request(operation_model, request_dict)
1081 except Exception as e:
1082 self.meta.events.emit(
1083 f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}',
1084 exception=e,
1085 context=request_context,
1086 )
1087 raise
1089 def _convert_to_request_dict(
1090 self,
1091 api_params,
1092 operation_model,
1093 endpoint_url,
1094 context=None,
1095 headers=None,
1096 set_user_agent_header=True,
1097 ):
1098 request_dict = self._serializer.serialize_to_request(
1099 api_params, operation_model
1100 )
1101 if not self._client_config.inject_host_prefix:
1102 request_dict.pop('host_prefix', None)
1103 if headers is not None:
1104 request_dict['headers'].update(headers)
1105 if set_user_agent_header:
1106 user_agent = self._user_agent_creator.to_string()
1107 else:
1108 user_agent = None
1109 prepare_request_dict(
1110 request_dict,
1111 endpoint_url=endpoint_url,
1112 user_agent=user_agent,
1113 context=context,
1114 )
1115 return request_dict
1117 def _emit_api_params(self, api_params, operation_model, context):
1118 # Given the API params provided by the user and the operation_model
1119 # we can serialize the request to a request_dict.
1120 operation_name = operation_model.name
1122 # Emit an event that allows users to modify the parameters at the
1123 # beginning of the method. It allows handlers to modify existing
1124 # parameters or return a new set of parameters to use.
1125 service_id = self._service_model.service_id.hyphenize()
1126 responses = self.meta.events.emit(
1127 f'provide-client-params.{service_id}.{operation_name}',
1128 params=api_params,
1129 model=operation_model,
1130 context=context,
1131 )
1132 api_params = first_non_none_response(responses, default=api_params)
1134 self.meta.events.emit(
1135 f'before-parameter-build.{service_id}.{operation_name}',
1136 params=api_params,
1137 model=operation_model,
1138 context=context,
1139 )
1140 return api_params
1142 def _resolve_endpoint_ruleset(
1143 self,
1144 operation_model,
1145 params,
1146 request_context,
1147 ignore_signing_region=False,
1148 ):
1149 """Returns endpoint URL and list of additional headers returned from
1150 EndpointRulesetResolver for the given operation and params. If the
1151 ruleset resolver is not available, for example because the service has
1152 no endpoints ruleset file, the legacy endpoint resolver's value is
1153 returned.
1155 Use ignore_signing_region for generating presigned URLs or any other
1156 situation where the signing region information from the ruleset
1157 resolver should be ignored.
1159 Returns tuple of URL and headers dictionary. Additionally, the
1160 request_context dict is modified in place with any signing information
1161 returned from the ruleset resolver.
1162 """
1163 if self._ruleset_resolver is None:
1164 endpoint_url = self.meta.endpoint_url
1165 additional_headers = {}
1166 endpoint_properties = {}
1167 else:
1168 endpoint_info = self._ruleset_resolver.construct_endpoint(
1169 operation_model=operation_model,
1170 call_args=params,
1171 request_context=request_context,
1172 )
1173 endpoint_url = endpoint_info.url
1174 additional_headers = endpoint_info.headers
1175 endpoint_properties = endpoint_info.properties
1176 # If authSchemes is present, overwrite default auth type and
1177 # signing context derived from service model.
1178 auth_schemes = endpoint_info.properties.get('authSchemes')
1179 if auth_schemes is not None:
1180 auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx(
1181 auth_schemes
1182 )
1183 auth_type, signing_context = auth_info
1184 request_context['auth_type'] = auth_type
1185 if 'region' in signing_context and ignore_signing_region:
1186 del signing_context['region']
1187 if 'signing' in request_context:
1188 request_context['signing'].update(signing_context)
1189 else:
1190 request_context['signing'] = signing_context
1192 return endpoint_url, additional_headers, endpoint_properties
1194 def get_paginator(self, operation_name):
1195 """Create a paginator for an operation.
1197 :type operation_name: string
1198 :param operation_name: The operation name. This is the same name
1199 as the method name on the client. For example, if the
1200 method name is ``create_foo``, and you'd normally invoke the
1201 operation as ``client.create_foo(**kwargs)``, if the
1202 ``create_foo`` operation can be paginated, you can use the
1203 call ``client.get_paginator("create_foo")``.
1205 :raise OperationNotPageableError: Raised if the operation is not
1206 pageable. You can use the ``client.can_paginate`` method to
1207 check if an operation is pageable.
1209 :rtype: ``botocore.paginate.Paginator``
1210 :return: A paginator object.
1212 """
1213 if not self.can_paginate(operation_name):
1214 raise OperationNotPageableError(operation_name=operation_name)
1215 else:
1216 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1218 # Create a new paginate method that will serve as a proxy to
1219 # the underlying Paginator.paginate method. This is needed to
1220 # attach a docstring to the method.
1221 def paginate(self, **kwargs):
1222 return Paginator.paginate(self, **kwargs)
1224 paginator_config = self._cache['page_config'][
1225 actual_operation_name
1226 ]
1227 # Add the docstring for the paginate method.
1228 paginate.__doc__ = PaginatorDocstring(
1229 paginator_name=actual_operation_name,
1230 event_emitter=self.meta.events,
1231 service_model=self.meta.service_model,
1232 paginator_config=paginator_config,
1233 include_signature=False,
1234 )
1236 # Rename the paginator class based on the type of paginator.
1237 service_module_name = get_service_module_name(
1238 self.meta.service_model
1239 )
1240 paginator_class_name = (
1241 f"{service_module_name}.Paginator.{actual_operation_name}"
1242 )
1244 # Create the new paginator class
1245 documented_paginator_cls = type(
1246 paginator_class_name, (Paginator,), {'paginate': paginate}
1247 )
1249 operation_model = self._service_model.operation_model(
1250 actual_operation_name
1251 )
1252 paginator = documented_paginator_cls(
1253 getattr(self, operation_name),
1254 paginator_config,
1255 operation_model,
1256 )
1257 return paginator
1259 def can_paginate(self, operation_name):
1260 """Check if an operation can be paginated.
1262 :type operation_name: string
1263 :param operation_name: The operation name. This is the same name
1264 as the method name on the client. For example, if the
1265 method name is ``create_foo``, and you'd normally invoke the
1266 operation as ``client.create_foo(**kwargs)``, if the
1267 ``create_foo`` operation can be paginated, you can use the
1268 call ``client.get_paginator("create_foo")``.
1270 :return: ``True`` if the operation can be paginated,
1271 ``False`` otherwise.
1273 """
1274 if 'page_config' not in self._cache:
1275 try:
1276 page_config = self._loader.load_service_model(
1277 self._service_model.service_name,
1278 'paginators-1',
1279 self._service_model.api_version,
1280 )['pagination']
1281 self._cache['page_config'] = page_config
1282 except DataNotFoundError:
1283 self._cache['page_config'] = {}
1284 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1285 return actual_operation_name in self._cache['page_config']
1287 def _get_waiter_config(self):
1288 if 'waiter_config' not in self._cache:
1289 try:
1290 waiter_config = self._loader.load_service_model(
1291 self._service_model.service_name,
1292 'waiters-2',
1293 self._service_model.api_version,
1294 )
1295 self._cache['waiter_config'] = waiter_config
1296 except DataNotFoundError:
1297 self._cache['waiter_config'] = {}
1298 return self._cache['waiter_config']
1300 def get_waiter(self, waiter_name):
1301 """Returns an object that can wait for some condition.
1303 :type waiter_name: str
1304 :param waiter_name: The name of the waiter to get. See the waiters
1305 section of the service docs for a list of available waiters.
1307 :returns: The specified waiter object.
1308 :rtype: ``botocore.waiter.Waiter``
1309 """
1310 config = self._get_waiter_config()
1311 if not config:
1312 raise ValueError(f"Waiter does not exist: {waiter_name}")
1313 model = waiter.WaiterModel(config)
1314 mapping = {}
1315 for name in model.waiter_names:
1316 mapping[xform_name(name)] = name
1317 if waiter_name not in mapping:
1318 raise ValueError(f"Waiter does not exist: {waiter_name}")
1320 return waiter.create_waiter_with_client(
1321 mapping[waiter_name], model, self
1322 )
1324 @CachedProperty
1325 def waiter_names(self):
1326 """Returns a list of all available waiters."""
1327 config = self._get_waiter_config()
1328 if not config:
1329 return []
1330 model = waiter.WaiterModel(config)
1331 # Waiter configs is a dict, we just want the waiter names
1332 # which are the keys in the dict.
1333 return [xform_name(name) for name in model.waiter_names]
1335 @property
1336 def exceptions(self):
1337 if self._exceptions is None:
1338 self._exceptions = self._load_exceptions()
1339 return self._exceptions
1341 def _load_exceptions(self):
1342 return self._exceptions_factory.create_client_exceptions(
1343 self._service_model
1344 )
1346 def _get_credentials(self):
1347 """
1348 This private interface is subject to abrupt breaking changes, including
1349 removal, in any botocore release.
1350 """
1351 return self._request_signer._credentials
1354class ClientMeta:
1355 """Holds additional client methods.
1357 This class holds additional information for clients. It exists for
1358 two reasons:
1360 * To give advanced functionality to clients
1361 * To namespace additional client attributes from the operation
1362 names which are mapped to methods at runtime. This avoids
1363 ever running into collisions with operation names.
1365 """
1367 def __init__(
1368 self,
1369 events,
1370 client_config,
1371 endpoint_url,
1372 service_model,
1373 method_to_api_mapping,
1374 partition,
1375 ):
1376 self.events = events
1377 self._client_config = client_config
1378 self._endpoint_url = endpoint_url
1379 self._service_model = service_model
1380 self._method_to_api_mapping = method_to_api_mapping
1381 self._partition = partition
1383 @property
1384 def service_model(self):
1385 return self._service_model
1387 @property
1388 def region_name(self):
1389 return self._client_config.region_name
1391 @property
1392 def endpoint_url(self):
1393 return self._endpoint_url
1395 @property
1396 def config(self):
1397 return self._client_config
1399 @property
1400 def method_to_api_mapping(self):
1401 return self._method_to_api_mapping
1403 @property
1404 def partition(self):
1405 return self._partition
1408def _get_configured_signature_version(
1409 service_name, client_config, scoped_config
1410):
1411 """
1412 Gets the manually configured signature version.
1414 :returns: the customer configured signature version, or None if no
1415 signature version was configured.
1416 """
1417 # Client config overrides everything.
1418 if client_config and client_config.signature_version is not None:
1419 return client_config.signature_version
1421 # Scoped config overrides picking from the endpoint metadata.
1422 if scoped_config is not None:
1423 # A given service may have service specific configuration in the
1424 # config file, so we need to check there as well.
1425 service_config = scoped_config.get(service_name)
1426 if service_config is not None and isinstance(service_config, dict):
1427 version = service_config.get('signature_version')
1428 if version:
1429 logger.debug(
1430 "Switching signature version for service %s "
1431 "to version %s based on config file override.",
1432 service_name,
1433 version,
1434 )
1435 return version
1436 return None