Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/client.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import logging
15from botocore import (
16 UNSIGNED, # noqa: F401
17 waiter,
18 xform_name,
19)
20from botocore.args import ClientArgsCreator
21from botocore.auth import AUTH_TYPE_MAPS, resolve_auth_type
22from botocore.awsrequest import prepare_request_dict
23from botocore.compress import maybe_compress_request
24from botocore.config import Config
25from botocore.context import with_current_context
26from botocore.credentials import RefreshableCredentials
27from botocore.discovery import (
28 EndpointDiscoveryHandler,
29 EndpointDiscoveryManager,
30 block_endpoint_discovery_required_operations,
31)
32from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring
33from botocore.exceptions import (
34 ClientError, # noqa: F401
35 DataNotFoundError,
36 InvalidEndpointDiscoveryConfigurationError,
37 OperationNotPageableError,
38 UnknownServiceError,
39 UnknownSignatureVersionError,
40)
41from botocore.history import get_global_history_recorder
42from botocore.hooks import first_non_none_response
43from botocore.httpchecksum import (
44 apply_request_checksum,
45 resolve_checksum_context,
46)
47from botocore.model import ServiceModel
48from botocore.paginate import Paginator
49from botocore.retries import adaptive, standard
50from botocore.useragent import UserAgentString, register_feature_id
51from botocore.utils import (
52 CachedProperty,
53 EventbridgeSignerSetter,
54 S3ArnParamHandler, # noqa: F401
55 S3ControlArnParamHandler, # noqa: F401
56 S3ControlArnParamHandlerv2,
57 S3ControlEndpointSetter, # noqa: F401
58 S3EndpointSetter, # noqa: F401
59 S3ExpressIdentityResolver,
60 S3RegionRedirector, # noqa: F401
61 S3RegionRedirectorv2,
62 ensure_boolean,
63 get_service_module_name,
64)
66logger = logging.getLogger(__name__)
67history_recorder = get_global_history_recorder()
70class ClientCreator:
71 """Creates client objects for a service."""
73 def __init__(
74 self,
75 loader,
76 endpoint_resolver,
77 user_agent,
78 event_emitter,
79 retry_handler_factory,
80 retry_config_translator,
81 response_parser_factory=None,
82 exceptions_factory=None,
83 config_store=None,
84 user_agent_creator=None,
85 auth_token_resolver=None,
86 ):
87 self._loader = loader
88 self._endpoint_resolver = endpoint_resolver
89 self._user_agent = user_agent
90 self._event_emitter = event_emitter
91 self._retry_handler_factory = retry_handler_factory
92 self._retry_config_translator = retry_config_translator
93 self._response_parser_factory = response_parser_factory
94 self._exceptions_factory = exceptions_factory
95 # TODO: Migrate things away from scoped_config in favor of the
96 # config_store. The config store can pull things from both the scoped
97 # config and environment variables (and potentially more in the
98 # future).
99 self._config_store = config_store
100 self._user_agent_creator = user_agent_creator
101 self._auth_token_resolver = auth_token_resolver
103 def create_client(
104 self,
105 service_name,
106 region_name,
107 is_secure=True,
108 endpoint_url=None,
109 verify=None,
110 credentials=None,
111 scoped_config=None,
112 api_version=None,
113 client_config=None,
114 auth_token=None,
115 ):
116 responses = self._event_emitter.emit(
117 'choose-service-name', service_name=service_name
118 )
119 service_name = first_non_none_response(responses, default=service_name)
120 service_model = self._load_service_model(service_name, api_version)
121 try:
122 endpoints_ruleset_data = self._load_service_endpoints_ruleset(
123 service_name, api_version
124 )
125 partition_data = self._loader.load_data('partitions')
126 except UnknownServiceError:
127 endpoints_ruleset_data = None
128 partition_data = None
129 logger.info(
130 'No endpoints ruleset found for service %s, falling back to '
131 'legacy endpoint routing.',
132 service_name,
133 )
135 cls = self._create_client_class(service_name, service_model)
136 region_name, client_config = self._normalize_fips_region(
137 region_name, client_config
138 )
139 if auth := service_model.metadata.get('auth'):
140 service_signature_version = resolve_auth_type(auth)
141 else:
142 service_signature_version = service_model.metadata.get(
143 'signatureVersion'
144 )
145 endpoint_bridge = ClientEndpointBridge(
146 self._endpoint_resolver,
147 scoped_config,
148 client_config,
149 service_signing_name=service_model.metadata.get('signingName'),
150 config_store=self._config_store,
151 service_signature_version=service_signature_version,
152 )
153 if token := self._evaluate_client_specific_token(
154 service_model.signing_name
155 ):
156 auth_token = token
157 client_args = self._get_client_args(
158 service_model,
159 region_name,
160 is_secure,
161 endpoint_url,
162 verify,
163 credentials,
164 scoped_config,
165 client_config,
166 endpoint_bridge,
167 auth_token,
168 endpoints_ruleset_data,
169 partition_data,
170 )
171 service_client = cls(**client_args)
172 self._register_retries(service_client)
173 self._register_s3_events(
174 client=service_client,
175 endpoint_bridge=None,
176 endpoint_url=None,
177 client_config=client_config,
178 scoped_config=scoped_config,
179 )
180 self._register_s3express_events(client=service_client)
181 self._register_s3_control_events(client=service_client)
182 self._register_importexport_events(client=service_client)
183 self._register_endpoint_discovery(
184 service_client, endpoint_url, client_config
185 )
186 return service_client
188 def create_client_class(self, service_name, api_version=None):
189 service_model = self._load_service_model(service_name, api_version)
190 return self._create_client_class(service_name, service_model)
192 def _create_client_class(self, service_name, service_model):
193 class_attributes = self._create_methods(service_model)
194 py_name_to_operation_name = self._create_name_mapping(service_model)
195 class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name
196 bases = [BaseClient]
197 service_id = service_model.service_id.hyphenize()
198 self._event_emitter.emit(
199 f'creating-client-class.{service_id}',
200 class_attributes=class_attributes,
201 base_classes=bases,
202 )
203 class_name = get_service_module_name(service_model)
204 cls = type(str(class_name), tuple(bases), class_attributes)
205 return cls
207 def _normalize_fips_region(self, region_name, client_config):
208 if region_name is not None:
209 normalized_region_name = region_name.replace('fips-', '').replace(
210 '-fips', ''
211 )
212 # If region has been transformed then set flag
213 if normalized_region_name != region_name:
214 config_use_fips_endpoint = Config(use_fips_endpoint=True)
215 if client_config:
216 # Keeping endpoint setting client specific
217 client_config = client_config.merge(
218 config_use_fips_endpoint
219 )
220 else:
221 client_config = config_use_fips_endpoint
222 logger.warning(
223 'transforming region from %s to %s and setting '
224 'use_fips_endpoint to true. client should not '
225 'be configured with a fips psuedo region.',
226 region_name,
227 normalized_region_name,
228 )
229 region_name = normalized_region_name
230 return region_name, client_config
232 def _load_service_model(self, service_name, api_version=None):
233 json_model = self._loader.load_service_model(
234 service_name, 'service-2', api_version=api_version
235 )
236 service_model = ServiceModel(json_model, service_name=service_name)
237 return service_model
239 def _load_service_endpoints_ruleset(self, service_name, api_version=None):
240 return self._loader.load_service_model(
241 service_name, 'endpoint-rule-set-1', api_version=api_version
242 )
244 def _register_retries(self, client):
245 retry_mode = client.meta.config.retries['mode']
246 if retry_mode == 'standard':
247 self._register_v2_standard_retries(client)
248 elif retry_mode == 'adaptive':
249 self._register_v2_standard_retries(client)
250 self._register_v2_adaptive_retries(client)
251 elif retry_mode == 'legacy':
252 self._register_legacy_retries(client)
253 else:
254 return
255 register_feature_id(f'RETRY_MODE_{retry_mode.upper()}')
257 def _register_v2_standard_retries(self, client):
258 max_attempts = client.meta.config.retries.get('total_max_attempts')
259 kwargs = {'client': client}
260 if max_attempts is not None:
261 kwargs['max_attempts'] = max_attempts
262 standard.register_retry_handler(**kwargs)
264 def _register_v2_adaptive_retries(self, client):
265 adaptive.register_retry_handler(client)
267 def _register_legacy_retries(self, client):
268 endpoint_prefix = client.meta.service_model.endpoint_prefix
269 service_id = client.meta.service_model.service_id
270 service_event_name = service_id.hyphenize()
272 # First, we load the entire retry config for all services,
273 # then pull out just the information we need.
274 original_config = self._loader.load_data('_retry')
275 if not original_config:
276 return
278 retries = self._transform_legacy_retries(client.meta.config.retries)
279 retry_config = self._retry_config_translator.build_retry_config(
280 endpoint_prefix,
281 original_config.get('retry', {}),
282 original_config.get('definitions', {}),
283 retries,
284 )
286 logger.debug(
287 "Registering retry handlers for service: %s",
288 client.meta.service_model.service_name,
289 )
290 handler = self._retry_handler_factory.create_retry_handler(
291 retry_config, endpoint_prefix
292 )
293 unique_id = f'retry-config-{service_event_name}'
294 client.meta.events.register(
295 f"needs-retry.{service_event_name}", handler, unique_id=unique_id
296 )
298 def _transform_legacy_retries(self, retries):
299 if retries is None:
300 return
301 copied_args = retries.copy()
302 if 'total_max_attempts' in retries:
303 copied_args = retries.copy()
304 copied_args['max_attempts'] = (
305 copied_args.pop('total_max_attempts') - 1
306 )
307 return copied_args
309 def _get_retry_mode(self, client, config_store):
310 client_retries = client.meta.config.retries
311 if (
312 client_retries is not None
313 and client_retries.get('mode') is not None
314 ):
315 return client_retries['mode']
316 return config_store.get_config_variable('retry_mode') or 'legacy'
318 def _register_endpoint_discovery(self, client, endpoint_url, config):
319 if endpoint_url is not None:
320 # Don't register any handlers in the case of a custom endpoint url
321 return
322 # Only attach handlers if the service supports discovery
323 if client.meta.service_model.endpoint_discovery_operation is None:
324 return
325 events = client.meta.events
326 service_id = client.meta.service_model.service_id.hyphenize()
327 enabled = False
328 if config and config.endpoint_discovery_enabled is not None:
329 enabled = config.endpoint_discovery_enabled
330 elif self._config_store:
331 enabled = self._config_store.get_config_variable(
332 'endpoint_discovery_enabled'
333 )
335 enabled = self._normalize_endpoint_discovery_config(enabled)
336 if enabled and self._requires_endpoint_discovery(client, enabled):
337 discover = enabled is True
338 manager = EndpointDiscoveryManager(
339 client, always_discover=discover
340 )
341 handler = EndpointDiscoveryHandler(manager)
342 handler.register(events, service_id)
343 else:
344 events.register(
345 'before-parameter-build',
346 block_endpoint_discovery_required_operations,
347 )
349 def _normalize_endpoint_discovery_config(self, enabled):
350 """Config must either be a boolean-string or string-literal 'auto'"""
351 if isinstance(enabled, str):
352 enabled = enabled.lower().strip()
353 if enabled == 'auto':
354 return enabled
355 elif enabled in ('true', 'false'):
356 return ensure_boolean(enabled)
357 elif isinstance(enabled, bool):
358 return enabled
360 raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled)
362 def _requires_endpoint_discovery(self, client, enabled):
363 if enabled == "auto":
364 return client.meta.service_model.endpoint_discovery_required
365 return enabled
367 def _register_eventbridge_events(
368 self, client, endpoint_bridge, endpoint_url
369 ):
370 if client.meta.service_model.service_name != 'events':
371 return
372 EventbridgeSignerSetter(
373 endpoint_resolver=self._endpoint_resolver,
374 region=client.meta.region_name,
375 endpoint_url=endpoint_url,
376 ).register(client.meta.events)
378 def _register_s3express_events(
379 self,
380 client,
381 endpoint_bridge=None,
382 endpoint_url=None,
383 client_config=None,
384 scoped_config=None,
385 ):
386 if client.meta.service_model.service_name != 's3':
387 return
388 S3ExpressIdentityResolver(client, RefreshableCredentials).register()
390 def _register_s3_events(
391 self,
392 client,
393 endpoint_bridge,
394 endpoint_url,
395 client_config,
396 scoped_config,
397 ):
398 if client.meta.service_model.service_name != 's3':
399 return
400 S3RegionRedirectorv2(None, client).register()
401 self._set_s3_presign_signature_version(
402 client.meta, client_config, scoped_config
403 )
404 client.meta.events.register(
405 'before-parameter-build.s3', self._inject_s3_input_parameters
406 )
408 def _register_s3_control_events(
409 self,
410 client,
411 endpoint_bridge=None,
412 endpoint_url=None,
413 client_config=None,
414 scoped_config=None,
415 ):
416 if client.meta.service_model.service_name != 's3control':
417 return
418 S3ControlArnParamHandlerv2().register(client.meta.events)
420 def _set_s3_presign_signature_version(
421 self, client_meta, client_config, scoped_config
422 ):
423 # This will return the manually configured signature version, or None
424 # if none was manually set. If a customer manually sets the signature
425 # version, we always want to use what they set.
426 provided_signature_version = _get_configured_signature_version(
427 's3', client_config, scoped_config
428 )
429 if provided_signature_version is not None:
430 return
432 # Check to see if the region is a region that we know about. If we
433 # don't know about a region, then we can safely assume it's a new
434 # region that is sigv4 only, since all new S3 regions only allow sigv4.
435 # The only exception is aws-global. This is a pseudo-region for the
436 # global endpoint, we should respect the signature versions it
437 # supports, which includes v2.
438 regions = self._endpoint_resolver.get_available_endpoints(
439 's3', client_meta.partition
440 )
441 if (
442 client_meta.region_name != 'aws-global'
443 and client_meta.region_name not in regions
444 ):
445 return
447 # If it is a region we know about, we want to default to sigv2, so here
448 # we check to see if it is available.
449 endpoint = self._endpoint_resolver.construct_endpoint(
450 's3', client_meta.region_name
451 )
452 signature_versions = endpoint['signatureVersions']
453 if 's3' not in signature_versions:
454 return
456 # We now know that we're in a known region that supports sigv2 and
457 # the customer hasn't set a signature version so we default the
458 # signature version to sigv2.
459 client_meta.events.register(
460 'choose-signer.s3', self._default_s3_presign_to_sigv2
461 )
463 def _inject_s3_input_parameters(self, params, context, **kwargs):
464 context['input_params'] = {}
465 inject_parameters = ('Bucket', 'Delete', 'Key', 'Prefix')
466 for inject_parameter in inject_parameters:
467 if inject_parameter in params:
468 context['input_params'][inject_parameter] = params[
469 inject_parameter
470 ]
472 def _default_s3_presign_to_sigv2(self, signature_version, **kwargs):
473 """
474 Returns the 's3' (sigv2) signer if presigning an s3 request. This is
475 intended to be used to set the default signature version for the signer
476 to sigv2. Situations where an asymmetric signature is required are the
477 exception, for example MRAP needs v4a.
479 :type signature_version: str
480 :param signature_version: The current client signature version.
482 :type signing_name: str
483 :param signing_name: The signing name of the service.
485 :return: 's3' if the request is an s3 presign request, None otherwise
486 """
487 if signature_version.startswith('v4a'):
488 return
490 if signature_version.startswith('v4-s3express'):
491 return signature_version
493 for suffix in ['-query', '-presign-post']:
494 if signature_version.endswith(suffix):
495 return f's3{suffix}'
497 def _register_importexport_events(
498 self,
499 client,
500 endpoint_bridge=None,
501 endpoint_url=None,
502 client_config=None,
503 scoped_config=None,
504 ):
505 if client.meta.service_model.service_name != 'importexport':
506 return
507 self._set_importexport_signature_version(
508 client.meta, client_config, scoped_config
509 )
511 def _set_importexport_signature_version(
512 self, client_meta, client_config, scoped_config
513 ):
514 # This will return the manually configured signature version, or None
515 # if none was manually set. If a customer manually sets the signature
516 # version, we always want to use what they set.
517 configured_signature_version = _get_configured_signature_version(
518 'importexport', client_config, scoped_config
519 )
520 if configured_signature_version is not None:
521 return
523 # importexport has a modeled signatureVersion of v2, but we
524 # previously switched to v4 via endpoint.json before endpoint rulesets.
525 # Override the model's signatureVersion for backwards compatability.
526 client_meta.events.register(
527 'choose-signer.importexport', self._default_signer_to_sigv4
528 )
530 def _default_signer_to_sigv4(self, signature_version, **kwargs):
531 return 'v4'
533 def _get_client_args(
534 self,
535 service_model,
536 region_name,
537 is_secure,
538 endpoint_url,
539 verify,
540 credentials,
541 scoped_config,
542 client_config,
543 endpoint_bridge,
544 auth_token,
545 endpoints_ruleset_data,
546 partition_data,
547 ):
548 args_creator = ClientArgsCreator(
549 self._event_emitter,
550 self._user_agent,
551 self._response_parser_factory,
552 self._loader,
553 self._exceptions_factory,
554 config_store=self._config_store,
555 user_agent_creator=self._user_agent_creator,
556 )
557 return args_creator.get_client_args(
558 service_model,
559 region_name,
560 is_secure,
561 endpoint_url,
562 verify,
563 credentials,
564 scoped_config,
565 client_config,
566 endpoint_bridge,
567 auth_token,
568 endpoints_ruleset_data,
569 partition_data,
570 )
572 def _create_methods(self, service_model):
573 op_dict = {}
574 for operation_name in service_model.operation_names:
575 py_operation_name = xform_name(operation_name)
576 op_dict[py_operation_name] = self._create_api_method(
577 py_operation_name, operation_name, service_model
578 )
579 return op_dict
581 def _create_name_mapping(self, service_model):
582 # py_name -> OperationName, for every operation available
583 # for a service.
584 mapping = {}
585 for operation_name in service_model.operation_names:
586 py_operation_name = xform_name(operation_name)
587 mapping[py_operation_name] = operation_name
588 return mapping
590 def _create_api_method(
591 self, py_operation_name, operation_name, service_model
592 ):
593 def _api_call(self, *args, **kwargs):
594 # We're accepting *args so that we can give a more helpful
595 # error message than TypeError: _api_call takes exactly
596 # 1 argument.
597 if args:
598 raise TypeError(
599 f"{py_operation_name}() only accepts keyword arguments."
600 )
601 # The "self" in this scope is referring to the BaseClient.
602 return self._make_api_call(operation_name, kwargs)
604 _api_call.__name__ = str(py_operation_name)
606 # Add the docstring to the client method
607 operation_model = service_model.operation_model(operation_name)
608 docstring = ClientMethodDocstring(
609 operation_model=operation_model,
610 method_name=operation_name,
611 event_emitter=self._event_emitter,
612 method_description=operation_model.documentation,
613 example_prefix=f'response = client.{py_operation_name}',
614 include_signature=False,
615 )
616 _api_call.__doc__ = docstring
617 return _api_call
619 def _evaluate_client_specific_token(self, signing_name):
620 # Resolves an auth_token for the given signing_name.
621 # Returns None if no resolver is set or if resolution fails.
622 resolver = self._auth_token_resolver
623 if not resolver or not signing_name:
624 return None
626 return resolver(signing_name=signing_name)
629class ClientEndpointBridge:
630 """Bridges endpoint data and client creation
632 This class handles taking out the relevant arguments from the endpoint
633 resolver and determining which values to use, taking into account any
634 client configuration options and scope configuration options.
636 This class also handles determining what, if any, region to use if no
637 explicit region setting is provided. For example, Amazon S3 client will
638 utilize "us-east-1" by default if no region can be resolved."""
640 DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com'
641 _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control']
643 def __init__(
644 self,
645 endpoint_resolver,
646 scoped_config=None,
647 client_config=None,
648 default_endpoint=None,
649 service_signing_name=None,
650 config_store=None,
651 service_signature_version=None,
652 ):
653 self.service_signing_name = service_signing_name
654 self.endpoint_resolver = endpoint_resolver
655 self.scoped_config = scoped_config
656 self.client_config = client_config
657 self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT
658 self.config_store = config_store
659 self.service_signature_version = service_signature_version
661 def resolve(
662 self, service_name, region_name=None, endpoint_url=None, is_secure=True
663 ):
664 region_name = self._check_default_region(service_name, region_name)
665 use_dualstack_endpoint = self._resolve_use_dualstack_endpoint(
666 service_name
667 )
668 use_fips_endpoint = self._resolve_endpoint_variant_config_var(
669 'use_fips_endpoint'
670 )
671 resolved = self.endpoint_resolver.construct_endpoint(
672 service_name,
673 region_name,
674 use_dualstack_endpoint=use_dualstack_endpoint,
675 use_fips_endpoint=use_fips_endpoint,
676 )
678 # If we can't resolve the region, we'll attempt to get a global
679 # endpoint for non-regionalized services (iam, route53, etc)
680 if not resolved:
681 # TODO: fallback partition_name should be configurable in the
682 # future for users to define as needed.
683 resolved = self.endpoint_resolver.construct_endpoint(
684 service_name,
685 region_name,
686 partition_name='aws',
687 use_dualstack_endpoint=use_dualstack_endpoint,
688 use_fips_endpoint=use_fips_endpoint,
689 )
691 if resolved:
692 return self._create_endpoint(
693 resolved, service_name, region_name, endpoint_url, is_secure
694 )
695 else:
696 return self._assume_endpoint(
697 service_name, region_name, endpoint_url, is_secure
698 )
700 def resolver_uses_builtin_data(self):
701 return self.endpoint_resolver.uses_builtin_data
703 def _check_default_region(self, service_name, region_name):
704 if region_name is not None:
705 return region_name
706 # Use the client_config region if no explicit region was provided.
707 if self.client_config and self.client_config.region_name is not None:
708 return self.client_config.region_name
710 def _create_endpoint(
711 self, resolved, service_name, region_name, endpoint_url, is_secure
712 ):
713 region_name, signing_region = self._pick_region_values(
714 resolved, region_name, endpoint_url
715 )
716 if endpoint_url is None:
717 endpoint_url = self._make_url(
718 resolved.get('hostname'),
719 is_secure,
720 resolved.get('protocols', []),
721 )
722 signature_version = self._resolve_signature_version(
723 service_name, resolved
724 )
725 signing_name = self._resolve_signing_name(service_name, resolved)
726 return self._create_result(
727 service_name=service_name,
728 region_name=region_name,
729 signing_region=signing_region,
730 signing_name=signing_name,
731 endpoint_url=endpoint_url,
732 metadata=resolved,
733 signature_version=signature_version,
734 )
736 def _resolve_endpoint_variant_config_var(self, config_var):
737 client_config = self.client_config
738 config_val = False
740 # Client configuration arg has precedence
741 if client_config and getattr(client_config, config_var) is not None:
742 return getattr(client_config, config_var)
743 elif self.config_store is not None:
744 # Check config store
745 config_val = self.config_store.get_config_variable(config_var)
746 return config_val
748 def _resolve_use_dualstack_endpoint(self, service_name):
749 s3_dualstack_mode = self._is_s3_dualstack_mode(service_name)
750 if s3_dualstack_mode is not None:
751 return s3_dualstack_mode
752 return self._resolve_endpoint_variant_config_var(
753 'use_dualstack_endpoint'
754 )
756 def _is_s3_dualstack_mode(self, service_name):
757 if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES:
758 return None
759 # TODO: This normalization logic is duplicated from the
760 # ClientArgsCreator class. Consolidate everything to
761 # ClientArgsCreator. _resolve_signature_version also has similarly
762 # duplicated logic.
763 client_config = self.client_config
764 if (
765 client_config is not None
766 and client_config.s3 is not None
767 and 'use_dualstack_endpoint' in client_config.s3
768 ):
769 # Client config trumps scoped config.
770 return client_config.s3['use_dualstack_endpoint']
771 if self.scoped_config is not None:
772 enabled = self.scoped_config.get('s3', {}).get(
773 'use_dualstack_endpoint'
774 )
775 if enabled in [True, 'True', 'true']:
776 return True
778 def _assume_endpoint(
779 self, service_name, region_name, endpoint_url, is_secure
780 ):
781 if endpoint_url is None:
782 # Expand the default hostname URI template.
783 hostname = self.default_endpoint.format(
784 service=service_name, region=region_name
785 )
786 endpoint_url = self._make_url(
787 hostname, is_secure, ['http', 'https']
788 )
789 logger.debug(
790 'Assuming an endpoint for %s, %s: %s',
791 service_name,
792 region_name,
793 endpoint_url,
794 )
795 # We still want to allow the user to provide an explicit version.
796 signature_version = self._resolve_signature_version(
797 service_name, {'signatureVersions': ['v4']}
798 )
799 signing_name = self._resolve_signing_name(service_name, resolved={})
800 return self._create_result(
801 service_name=service_name,
802 region_name=region_name,
803 signing_region=region_name,
804 signing_name=signing_name,
805 signature_version=signature_version,
806 endpoint_url=endpoint_url,
807 metadata={},
808 )
810 def _create_result(
811 self,
812 service_name,
813 region_name,
814 signing_region,
815 signing_name,
816 endpoint_url,
817 signature_version,
818 metadata,
819 ):
820 return {
821 'service_name': service_name,
822 'region_name': region_name,
823 'signing_region': signing_region,
824 'signing_name': signing_name,
825 'endpoint_url': endpoint_url,
826 'signature_version': signature_version,
827 'metadata': metadata,
828 }
830 def _make_url(self, hostname, is_secure, supported_protocols):
831 if is_secure and 'https' in supported_protocols:
832 scheme = 'https'
833 else:
834 scheme = 'http'
835 return f'{scheme}://{hostname}'
837 def _resolve_signing_name(self, service_name, resolved):
838 # CredentialScope overrides everything else.
839 if (
840 'credentialScope' in resolved
841 and 'service' in resolved['credentialScope']
842 ):
843 return resolved['credentialScope']['service']
844 # Use the signingName from the model if present.
845 if self.service_signing_name:
846 return self.service_signing_name
847 # Just assume is the same as the service name.
848 return service_name
850 def _pick_region_values(self, resolved, region_name, endpoint_url):
851 signing_region = region_name
852 if endpoint_url is None:
853 # Do not use the region name or signing name from the resolved
854 # endpoint if the user explicitly provides an endpoint_url. This
855 # would happen if we resolve to an endpoint where the service has
856 # a "defaults" section that overrides all endpoint with a single
857 # hostname and credentialScope. This has been the case historically
858 # for how STS has worked. The only way to resolve an STS endpoint
859 # was to provide a region_name and an endpoint_url. In that case,
860 # we would still resolve an endpoint, but we would not use the
861 # resolved endpointName or signingRegion because we want to allow
862 # custom endpoints.
863 region_name = resolved['endpointName']
864 signing_region = region_name
865 if (
866 'credentialScope' in resolved
867 and 'region' in resolved['credentialScope']
868 ):
869 signing_region = resolved['credentialScope']['region']
870 return region_name, signing_region
872 def _resolve_signature_version(self, service_name, resolved):
873 configured_version = _get_configured_signature_version(
874 service_name, self.client_config, self.scoped_config
875 )
876 if configured_version is not None:
877 return configured_version
879 # These have since added the "auth" key to the service model
880 # with "aws.auth#sigv4", but preserve existing behavior from
881 # when we preferred endpoints.json over the service models
882 if service_name in ('s3', 's3-control'):
883 return 's3v4'
885 if self.service_signature_version is not None:
886 # Prefer the service model
887 potential_versions = [self.service_signature_version]
888 else:
889 # Fall back to endpoints.json to preserve existing behavior, which
890 # may be useful for users who have custom service models
891 potential_versions = resolved.get('signatureVersions', [])
892 # This was added for the V2 -> V4 transition,
893 # for services that added V4 after V2 in endpoints.json
894 if 'v4' in potential_versions:
895 return 'v4'
896 # Now just iterate over the signature versions in order until we
897 # find the first one that is known to Botocore.
898 for known in potential_versions:
899 if known in AUTH_TYPE_MAPS:
900 return known
902 raise UnknownSignatureVersionError(
903 signature_version=potential_versions
904 )
907class BaseClient:
908 # This is actually reassigned with the py->op_name mapping
909 # when the client creator creates the subclass. This value is used
910 # because calls such as client.get_paginator('list_objects') use the
911 # snake_case name, but we need to know the ListObjects form.
912 # xform_name() does the ListObjects->list_objects conversion, but
913 # we need the reverse mapping here.
914 _PY_TO_OP_NAME = {}
916 def __init__(
917 self,
918 serializer,
919 endpoint,
920 response_parser,
921 event_emitter,
922 request_signer,
923 service_model,
924 loader,
925 client_config,
926 partition,
927 exceptions_factory,
928 endpoint_ruleset_resolver=None,
929 user_agent_creator=None,
930 ):
931 self._serializer = serializer
932 self._endpoint = endpoint
933 self._ruleset_resolver = endpoint_ruleset_resolver
934 self._response_parser = response_parser
935 self._request_signer = request_signer
936 self._cache = {}
937 self._loader = loader
938 self._client_config = client_config
939 self.meta = ClientMeta(
940 event_emitter,
941 self._client_config,
942 endpoint.host,
943 service_model,
944 self._PY_TO_OP_NAME,
945 partition,
946 )
947 self._exceptions_factory = exceptions_factory
948 self._exceptions = None
949 self._user_agent_creator = user_agent_creator
950 if self._user_agent_creator is None:
951 self._user_agent_creator = (
952 UserAgentString.from_environment().with_client_config(
953 self._client_config
954 )
955 )
956 self._register_handlers()
958 def __getattr__(self, item):
959 service_id = self._service_model.service_id.hyphenize()
960 event_name = f'getattr.{service_id}.{item}'
962 handler, event_response = self.meta.events.emit_until_response(
963 event_name, client=self
964 )
966 if event_response is not None:
967 return event_response
969 raise AttributeError(
970 f"'{self.__class__.__name__}' object has no attribute '{item}'"
971 )
973 def close(self):
974 """Closes underlying endpoint connections."""
975 self._endpoint.close()
977 def _register_handlers(self):
978 # Register the handler required to sign requests.
979 service_id = self.meta.service_model.service_id.hyphenize()
980 self.meta.events.register(
981 f"request-created.{service_id}", self._request_signer.handler
982 )
983 # Rebuild user agent string right before request is sent
984 # to ensure all registered features are included.
985 self.meta.events.register_last(
986 f"request-created.{service_id}",
987 self._user_agent_creator.rebuild_and_replace_user_agent_handler,
988 )
990 @property
991 def _service_model(self):
992 return self.meta.service_model
994 @with_current_context()
995 def _make_api_call(self, operation_name, api_params):
996 operation_model = self._service_model.operation_model(operation_name)
997 service_name = self._service_model.service_name
998 history_recorder.record(
999 'API_CALL',
1000 {
1001 'service': service_name,
1002 'operation': operation_name,
1003 'params': api_params,
1004 },
1005 )
1006 if operation_model.deprecated:
1007 logger.debug(
1008 'Warning: %s.%s() is deprecated', service_name, operation_name
1009 )
1010 request_context = {
1011 'client_region': self.meta.region_name,
1012 'client_config': self.meta.config,
1013 'has_streaming_input': operation_model.has_streaming_input,
1014 'auth_type': operation_model.resolved_auth_type,
1015 'unsigned_payload': operation_model.unsigned_payload,
1016 'auth_options': self._service_model.metadata.get('auth'),
1017 }
1019 api_params = self._emit_api_params(
1020 api_params=api_params,
1021 operation_model=operation_model,
1022 context=request_context,
1023 )
1024 (
1025 endpoint_url,
1026 additional_headers,
1027 properties,
1028 ) = self._resolve_endpoint_ruleset(
1029 operation_model, api_params, request_context
1030 )
1031 if properties:
1032 # Pass arbitrary endpoint info with the Request
1033 # for use during construction.
1034 request_context['endpoint_properties'] = properties
1035 request_dict = self._convert_to_request_dict(
1036 api_params=api_params,
1037 operation_model=operation_model,
1038 endpoint_url=endpoint_url,
1039 context=request_context,
1040 headers=additional_headers,
1041 )
1042 resolve_checksum_context(request_dict, operation_model, api_params)
1044 service_id = self._service_model.service_id.hyphenize()
1045 handler, event_response = self.meta.events.emit_until_response(
1046 f'before-call.{service_id}.{operation_name}',
1047 model=operation_model,
1048 params=request_dict,
1049 request_signer=self._request_signer,
1050 context=request_context,
1051 )
1053 if event_response is not None:
1054 http, parsed_response = event_response
1055 else:
1056 maybe_compress_request(
1057 self.meta.config, request_dict, operation_model
1058 )
1059 apply_request_checksum(request_dict)
1060 http, parsed_response = self._make_request(
1061 operation_model, request_dict, request_context
1062 )
1064 self.meta.events.emit(
1065 f'after-call.{service_id}.{operation_name}',
1066 http_response=http,
1067 parsed=parsed_response,
1068 model=operation_model,
1069 context=request_context,
1070 )
1072 if http.status_code >= 300:
1073 error_info = parsed_response.get("Error", {})
1074 error_code = request_context.get(
1075 'error_code_override'
1076 ) or error_info.get("Code")
1077 error_class = self.exceptions.from_code(error_code)
1078 raise error_class(parsed_response, operation_name)
1079 else:
1080 return parsed_response
1082 def _make_request(self, operation_model, request_dict, request_context):
1083 try:
1084 return self._endpoint.make_request(operation_model, request_dict)
1085 except Exception as e:
1086 self.meta.events.emit(
1087 f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}',
1088 exception=e,
1089 context=request_context,
1090 )
1091 raise
1093 def _convert_to_request_dict(
1094 self,
1095 api_params,
1096 operation_model,
1097 endpoint_url,
1098 context=None,
1099 headers=None,
1100 set_user_agent_header=True,
1101 ):
1102 request_dict = self._serializer.serialize_to_request(
1103 api_params, operation_model
1104 )
1105 if not self._client_config.inject_host_prefix:
1106 request_dict.pop('host_prefix', None)
1107 if headers is not None:
1108 request_dict['headers'].update(headers)
1109 if set_user_agent_header:
1110 user_agent = self._user_agent_creator.to_string()
1111 else:
1112 user_agent = None
1113 prepare_request_dict(
1114 request_dict,
1115 endpoint_url=endpoint_url,
1116 user_agent=user_agent,
1117 context=context,
1118 )
1119 return request_dict
1121 def _emit_api_params(self, api_params, operation_model, context):
1122 # Given the API params provided by the user and the operation_model
1123 # we can serialize the request to a request_dict.
1124 operation_name = operation_model.name
1126 # Emit an event that allows users to modify the parameters at the
1127 # beginning of the method. It allows handlers to modify existing
1128 # parameters or return a new set of parameters to use.
1129 service_id = self._service_model.service_id.hyphenize()
1130 responses = self.meta.events.emit(
1131 f'provide-client-params.{service_id}.{operation_name}',
1132 params=api_params,
1133 model=operation_model,
1134 context=context,
1135 )
1136 api_params = first_non_none_response(responses, default=api_params)
1138 self.meta.events.emit(
1139 f'before-parameter-build.{service_id}.{operation_name}',
1140 params=api_params,
1141 model=operation_model,
1142 context=context,
1143 )
1144 return api_params
1146 def _resolve_endpoint_ruleset(
1147 self,
1148 operation_model,
1149 params,
1150 request_context,
1151 ignore_signing_region=False,
1152 ):
1153 """Returns endpoint URL and list of additional headers returned from
1154 EndpointRulesetResolver for the given operation and params. If the
1155 ruleset resolver is not available, for example because the service has
1156 no endpoints ruleset file, the legacy endpoint resolver's value is
1157 returned.
1159 Use ignore_signing_region for generating presigned URLs or any other
1160 situation where the signing region information from the ruleset
1161 resolver should be ignored.
1163 Returns tuple of URL and headers dictionary. Additionally, the
1164 request_context dict is modified in place with any signing information
1165 returned from the ruleset resolver.
1166 """
1167 if self._ruleset_resolver is None:
1168 endpoint_url = self.meta.endpoint_url
1169 additional_headers = {}
1170 endpoint_properties = {}
1171 else:
1172 endpoint_info = self._ruleset_resolver.construct_endpoint(
1173 operation_model=operation_model,
1174 call_args=params,
1175 request_context=request_context,
1176 )
1177 endpoint_url = endpoint_info.url
1178 additional_headers = endpoint_info.headers
1179 endpoint_properties = endpoint_info.properties
1180 # If authSchemes is present, overwrite default auth type and
1181 # signing context derived from service model.
1182 auth_schemes = endpoint_info.properties.get('authSchemes')
1183 if auth_schemes is not None:
1184 auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx(
1185 auth_schemes
1186 )
1187 auth_type, signing_context = auth_info
1188 request_context['auth_type'] = auth_type
1189 if 'region' in signing_context and ignore_signing_region:
1190 del signing_context['region']
1191 if 'signing' in request_context:
1192 request_context['signing'].update(signing_context)
1193 else:
1194 request_context['signing'] = signing_context
1196 return endpoint_url, additional_headers, endpoint_properties
1198 def get_paginator(self, operation_name):
1199 """Create a paginator for an operation.
1201 :type operation_name: string
1202 :param operation_name: The operation name. This is the same name
1203 as the method name on the client. For example, if the
1204 method name is ``create_foo``, and you'd normally invoke the
1205 operation as ``client.create_foo(**kwargs)``, if the
1206 ``create_foo`` operation can be paginated, you can use the
1207 call ``client.get_paginator("create_foo")``.
1209 :raise OperationNotPageableError: Raised if the operation is not
1210 pageable. You can use the ``client.can_paginate`` method to
1211 check if an operation is pageable.
1213 :rtype: ``botocore.paginate.Paginator``
1214 :return: A paginator object.
1216 """
1217 if not self.can_paginate(operation_name):
1218 raise OperationNotPageableError(operation_name=operation_name)
1219 else:
1220 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1222 # Create a new paginate method that will serve as a proxy to
1223 # the underlying Paginator.paginate method. This is needed to
1224 # attach a docstring to the method.
1225 def paginate(self, **kwargs):
1226 return Paginator.paginate(self, **kwargs)
1228 paginator_config = self._cache['page_config'][
1229 actual_operation_name
1230 ]
1231 # Add the docstring for the paginate method.
1232 paginate.__doc__ = PaginatorDocstring(
1233 paginator_name=actual_operation_name,
1234 event_emitter=self.meta.events,
1235 service_model=self.meta.service_model,
1236 paginator_config=paginator_config,
1237 include_signature=False,
1238 )
1240 # Rename the paginator class based on the type of paginator.
1241 service_module_name = get_service_module_name(
1242 self.meta.service_model
1243 )
1244 paginator_class_name = (
1245 f"{service_module_name}.Paginator.{actual_operation_name}"
1246 )
1248 # Create the new paginator class
1249 documented_paginator_cls = type(
1250 paginator_class_name, (Paginator,), {'paginate': paginate}
1251 )
1253 operation_model = self._service_model.operation_model(
1254 actual_operation_name
1255 )
1256 paginator = documented_paginator_cls(
1257 getattr(self, operation_name),
1258 paginator_config,
1259 operation_model,
1260 )
1261 return paginator
1263 def can_paginate(self, operation_name):
1264 """Check if an operation can be paginated.
1266 :type operation_name: string
1267 :param operation_name: The operation name. This is the same name
1268 as the method name on the client. For example, if the
1269 method name is ``create_foo``, and you'd normally invoke the
1270 operation as ``client.create_foo(**kwargs)``, if the
1271 ``create_foo`` operation can be paginated, you can use the
1272 call ``client.get_paginator("create_foo")``.
1274 :return: ``True`` if the operation can be paginated,
1275 ``False`` otherwise.
1277 """
1278 if 'page_config' not in self._cache:
1279 try:
1280 page_config = self._loader.load_service_model(
1281 self._service_model.service_name,
1282 'paginators-1',
1283 self._service_model.api_version,
1284 )['pagination']
1285 self._cache['page_config'] = page_config
1286 except DataNotFoundError:
1287 self._cache['page_config'] = {}
1288 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1289 return actual_operation_name in self._cache['page_config']
1291 def _get_waiter_config(self):
1292 if 'waiter_config' not in self._cache:
1293 try:
1294 waiter_config = self._loader.load_service_model(
1295 self._service_model.service_name,
1296 'waiters-2',
1297 self._service_model.api_version,
1298 )
1299 self._cache['waiter_config'] = waiter_config
1300 except DataNotFoundError:
1301 self._cache['waiter_config'] = {}
1302 return self._cache['waiter_config']
1304 def get_waiter(self, waiter_name):
1305 """Returns an object that can wait for some condition.
1307 :type waiter_name: str
1308 :param waiter_name: The name of the waiter to get. See the waiters
1309 section of the service docs for a list of available waiters.
1311 :returns: The specified waiter object.
1312 :rtype: ``botocore.waiter.Waiter``
1313 """
1314 config = self._get_waiter_config()
1315 if not config:
1316 raise ValueError(f"Waiter does not exist: {waiter_name}")
1317 model = waiter.WaiterModel(config)
1318 mapping = {}
1319 for name in model.waiter_names:
1320 mapping[xform_name(name)] = name
1321 if waiter_name not in mapping:
1322 raise ValueError(f"Waiter does not exist: {waiter_name}")
1324 return waiter.create_waiter_with_client(
1325 mapping[waiter_name], model, self
1326 )
1328 @CachedProperty
1329 def waiter_names(self):
1330 """Returns a list of all available waiters."""
1331 config = self._get_waiter_config()
1332 if not config:
1333 return []
1334 model = waiter.WaiterModel(config)
1335 # Waiter configs is a dict, we just want the waiter names
1336 # which are the keys in the dict.
1337 return [xform_name(name) for name in model.waiter_names]
1339 @property
1340 def exceptions(self):
1341 if self._exceptions is None:
1342 self._exceptions = self._load_exceptions()
1343 return self._exceptions
1345 def _load_exceptions(self):
1346 return self._exceptions_factory.create_client_exceptions(
1347 self._service_model
1348 )
1350 def _get_credentials(self):
1351 """
1352 This private interface is subject to abrupt breaking changes, including
1353 removal, in any botocore release.
1354 """
1355 return self._request_signer._credentials
1358class ClientMeta:
1359 """Holds additional client methods.
1361 This class holds additional information for clients. It exists for
1362 two reasons:
1364 * To give advanced functionality to clients
1365 * To namespace additional client attributes from the operation
1366 names which are mapped to methods at runtime. This avoids
1367 ever running into collisions with operation names.
1369 """
1371 def __init__(
1372 self,
1373 events,
1374 client_config,
1375 endpoint_url,
1376 service_model,
1377 method_to_api_mapping,
1378 partition,
1379 ):
1380 self.events = events
1381 self._client_config = client_config
1382 self._endpoint_url = endpoint_url
1383 self._service_model = service_model
1384 self._method_to_api_mapping = method_to_api_mapping
1385 self._partition = partition
1387 @property
1388 def service_model(self):
1389 return self._service_model
1391 @property
1392 def region_name(self):
1393 return self._client_config.region_name
1395 @property
1396 def endpoint_url(self):
1397 return self._endpoint_url
1399 @property
1400 def config(self):
1401 return self._client_config
1403 @property
1404 def method_to_api_mapping(self):
1405 return self._method_to_api_mapping
1407 @property
1408 def partition(self):
1409 return self._partition
1412def _get_configured_signature_version(
1413 service_name, client_config, scoped_config
1414):
1415 """
1416 Gets the manually configured signature version.
1418 :returns: the customer configured signature version, or None if no
1419 signature version was configured.
1420 """
1421 # Client config overrides everything.
1422 if client_config and client_config.signature_version is not None:
1423 return client_config.signature_version
1425 # Scoped config overrides picking from the endpoint metadata.
1426 if scoped_config is not None:
1427 # A given service may have service specific configuration in the
1428 # config file, so we need to check there as well.
1429 service_config = scoped_config.get(service_name)
1430 if service_config is not None and isinstance(service_config, dict):
1431 version = service_config.get('signature_version')
1432 if version:
1433 logger.debug(
1434 "Switching signature version for service %s "
1435 "to version %s based on config file override.",
1436 service_name,
1437 version,
1438 )
1439 return version
1440 return None