Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/client.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
13import logging
15from botocore import waiter, xform_name
16from botocore.args import ClientArgsCreator
17from botocore.auth import AUTH_TYPE_MAPS, resolve_auth_type
18from botocore.awsrequest import prepare_request_dict
19from botocore.compress import maybe_compress_request
20from botocore.config import Config
21from botocore.credentials import RefreshableCredentials
22from botocore.discovery import (
23 EndpointDiscoveryHandler,
24 EndpointDiscoveryManager,
25 block_endpoint_discovery_required_operations,
26)
27from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring
28from botocore.exceptions import (
29 DataNotFoundError,
30 InvalidEndpointDiscoveryConfigurationError,
31 OperationNotPageableError,
32 UnknownServiceError,
33 UnknownSignatureVersionError,
34)
35from botocore.history import get_global_history_recorder
36from botocore.hooks import first_non_none_response
37from botocore.httpchecksum import (
38 apply_request_checksum,
39 resolve_checksum_context,
40)
41from botocore.model import ServiceModel
42from botocore.paginate import Paginator
43from botocore.retries import adaptive, standard
44from botocore.useragent import UserAgentString
45from botocore.utils import (
46 CachedProperty,
47 EventbridgeSignerSetter,
48 S3ControlArnParamHandlerv2,
49 S3ExpressIdentityResolver,
50 S3RegionRedirectorv2,
51 ensure_boolean,
52 get_service_module_name,
53)
55# Keep these imported. There's pre-existing code that uses:
56# "from botocore.client import UNSIGNED"
57# "from botocore.client import ClientError"
58# etc.
59from botocore.exceptions import ClientError # noqa
60from botocore.utils import S3ArnParamHandler # noqa
61from botocore.utils import S3ControlArnParamHandler # noqa
62from botocore.utils import S3ControlEndpointSetter # noqa
63from botocore.utils import S3EndpointSetter # noqa
64from botocore.utils import S3RegionRedirector # noqa
65from botocore import UNSIGNED # noqa
68_LEGACY_SIGNATURE_VERSIONS = frozenset(
69 (
70 'v2',
71 'v3',
72 'v3https',
73 'v4',
74 's3',
75 's3v4',
76 )
77)
80logger = logging.getLogger(__name__)
81history_recorder = get_global_history_recorder()
84class ClientCreator:
85 """Creates client objects for a service."""
87 def __init__(
88 self,
89 loader,
90 endpoint_resolver,
91 user_agent,
92 event_emitter,
93 retry_handler_factory,
94 retry_config_translator,
95 response_parser_factory=None,
96 exceptions_factory=None,
97 config_store=None,
98 user_agent_creator=None,
99 ):
100 self._loader = loader
101 self._endpoint_resolver = endpoint_resolver
102 self._user_agent = user_agent
103 self._event_emitter = event_emitter
104 self._retry_handler_factory = retry_handler_factory
105 self._retry_config_translator = retry_config_translator
106 self._response_parser_factory = response_parser_factory
107 self._exceptions_factory = exceptions_factory
108 # TODO: Migrate things away from scoped_config in favor of the
109 # config_store. The config store can pull things from both the scoped
110 # config and environment variables (and potentially more in the
111 # future).
112 self._config_store = config_store
113 self._user_agent_creator = user_agent_creator
115 def create_client(
116 self,
117 service_name,
118 region_name,
119 is_secure=True,
120 endpoint_url=None,
121 verify=None,
122 credentials=None,
123 scoped_config=None,
124 api_version=None,
125 client_config=None,
126 auth_token=None,
127 ):
128 responses = self._event_emitter.emit(
129 'choose-service-name', service_name=service_name
130 )
131 service_name = first_non_none_response(responses, default=service_name)
132 service_model = self._load_service_model(service_name, api_version)
133 try:
134 endpoints_ruleset_data = self._load_service_endpoints_ruleset(
135 service_name, api_version
136 )
137 partition_data = self._loader.load_data('partitions')
138 except UnknownServiceError:
139 endpoints_ruleset_data = None
140 partition_data = None
141 logger.info(
142 'No endpoints ruleset found for service %s, falling back to '
143 'legacy endpoint routing.',
144 service_name,
145 )
147 cls = self._create_client_class(service_name, service_model)
148 region_name, client_config = self._normalize_fips_region(
149 region_name, client_config
150 )
151 if auth := service_model.metadata.get('auth'):
152 service_signature_version = resolve_auth_type(auth)
153 else:
154 service_signature_version = service_model.metadata.get(
155 'signatureVersion'
156 )
157 endpoint_bridge = ClientEndpointBridge(
158 self._endpoint_resolver,
159 scoped_config,
160 client_config,
161 service_signing_name=service_model.metadata.get('signingName'),
162 config_store=self._config_store,
163 service_signature_version=service_signature_version,
164 )
165 client_args = self._get_client_args(
166 service_model,
167 region_name,
168 is_secure,
169 endpoint_url,
170 verify,
171 credentials,
172 scoped_config,
173 client_config,
174 endpoint_bridge,
175 auth_token,
176 endpoints_ruleset_data,
177 partition_data,
178 )
179 service_client = cls(**client_args)
180 self._register_retries(service_client)
181 self._register_s3_events(
182 client=service_client,
183 endpoint_bridge=None,
184 endpoint_url=None,
185 client_config=client_config,
186 scoped_config=scoped_config,
187 )
188 self._register_s3express_events(client=service_client)
189 self._register_s3_control_events(client=service_client)
190 self._register_endpoint_discovery(
191 service_client, endpoint_url, client_config
192 )
193 return service_client
195 def create_client_class(self, service_name, api_version=None):
196 service_model = self._load_service_model(service_name, api_version)
197 return self._create_client_class(service_name, service_model)
199 def _create_client_class(self, service_name, service_model):
200 class_attributes = self._create_methods(service_model)
201 py_name_to_operation_name = self._create_name_mapping(service_model)
202 class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name
203 bases = [BaseClient]
204 service_id = service_model.service_id.hyphenize()
205 self._event_emitter.emit(
206 f'creating-client-class.{service_id}',
207 class_attributes=class_attributes,
208 base_classes=bases,
209 )
210 class_name = get_service_module_name(service_model)
211 cls = type(str(class_name), tuple(bases), class_attributes)
212 return cls
214 def _normalize_fips_region(self, region_name, client_config):
215 if region_name is not None:
216 normalized_region_name = region_name.replace('fips-', '').replace(
217 '-fips', ''
218 )
219 # If region has been transformed then set flag
220 if normalized_region_name != region_name:
221 config_use_fips_endpoint = Config(use_fips_endpoint=True)
222 if client_config:
223 # Keeping endpoint setting client specific
224 client_config = client_config.merge(
225 config_use_fips_endpoint
226 )
227 else:
228 client_config = config_use_fips_endpoint
229 logger.warning(
230 f'transforming region from {region_name} to '
231 f'{normalized_region_name} and setting '
232 'use_fips_endpoint to true. client should not '
233 'be configured with a fips psuedo region.'
234 )
235 region_name = normalized_region_name
236 return region_name, client_config
238 def _load_service_model(self, service_name, api_version=None):
239 json_model = self._loader.load_service_model(
240 service_name, 'service-2', api_version=api_version
241 )
242 service_model = ServiceModel(json_model, service_name=service_name)
243 return service_model
245 def _load_service_endpoints_ruleset(self, service_name, api_version=None):
246 return self._loader.load_service_model(
247 service_name, 'endpoint-rule-set-1', api_version=api_version
248 )
250 def _register_retries(self, client):
251 retry_mode = client.meta.config.retries['mode']
252 if retry_mode == 'standard':
253 self._register_v2_standard_retries(client)
254 elif retry_mode == 'adaptive':
255 self._register_v2_standard_retries(client)
256 self._register_v2_adaptive_retries(client)
257 elif retry_mode == 'legacy':
258 self._register_legacy_retries(client)
260 def _register_v2_standard_retries(self, client):
261 max_attempts = client.meta.config.retries.get('total_max_attempts')
262 kwargs = {'client': client}
263 if max_attempts is not None:
264 kwargs['max_attempts'] = max_attempts
265 standard.register_retry_handler(**kwargs)
267 def _register_v2_adaptive_retries(self, client):
268 adaptive.register_retry_handler(client)
270 def _register_legacy_retries(self, client):
271 endpoint_prefix = client.meta.service_model.endpoint_prefix
272 service_id = client.meta.service_model.service_id
273 service_event_name = service_id.hyphenize()
275 # First, we load the entire retry config for all services,
276 # then pull out just the information we need.
277 original_config = self._loader.load_data('_retry')
278 if not original_config:
279 return
281 retries = self._transform_legacy_retries(client.meta.config.retries)
282 retry_config = self._retry_config_translator.build_retry_config(
283 endpoint_prefix,
284 original_config.get('retry', {}),
285 original_config.get('definitions', {}),
286 retries,
287 )
289 logger.debug(
290 "Registering retry handlers for service: %s",
291 client.meta.service_model.service_name,
292 )
293 handler = self._retry_handler_factory.create_retry_handler(
294 retry_config, endpoint_prefix
295 )
296 unique_id = f'retry-config-{service_event_name}'
297 client.meta.events.register(
298 f"needs-retry.{service_event_name}", handler, unique_id=unique_id
299 )
301 def _transform_legacy_retries(self, retries):
302 if retries is None:
303 return
304 copied_args = retries.copy()
305 if 'total_max_attempts' in retries:
306 copied_args = retries.copy()
307 copied_args['max_attempts'] = (
308 copied_args.pop('total_max_attempts') - 1
309 )
310 return copied_args
312 def _get_retry_mode(self, client, config_store):
313 client_retries = client.meta.config.retries
314 if (
315 client_retries is not None
316 and client_retries.get('mode') is not None
317 ):
318 return client_retries['mode']
319 return config_store.get_config_variable('retry_mode') or 'legacy'
321 def _register_endpoint_discovery(self, client, endpoint_url, config):
322 if endpoint_url is not None:
323 # Don't register any handlers in the case of a custom endpoint url
324 return
325 # Only attach handlers if the service supports discovery
326 if client.meta.service_model.endpoint_discovery_operation is None:
327 return
328 events = client.meta.events
329 service_id = client.meta.service_model.service_id.hyphenize()
330 enabled = False
331 if config and config.endpoint_discovery_enabled is not None:
332 enabled = config.endpoint_discovery_enabled
333 elif self._config_store:
334 enabled = self._config_store.get_config_variable(
335 'endpoint_discovery_enabled'
336 )
338 enabled = self._normalize_endpoint_discovery_config(enabled)
339 if enabled and self._requires_endpoint_discovery(client, enabled):
340 discover = enabled is True
341 manager = EndpointDiscoveryManager(
342 client, always_discover=discover
343 )
344 handler = EndpointDiscoveryHandler(manager)
345 handler.register(events, service_id)
346 else:
347 events.register(
348 'before-parameter-build',
349 block_endpoint_discovery_required_operations,
350 )
352 def _normalize_endpoint_discovery_config(self, enabled):
353 """Config must either be a boolean-string or string-literal 'auto'"""
354 if isinstance(enabled, str):
355 enabled = enabled.lower().strip()
356 if enabled == 'auto':
357 return enabled
358 elif enabled in ('true', 'false'):
359 return ensure_boolean(enabled)
360 elif isinstance(enabled, bool):
361 return enabled
363 raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled)
365 def _requires_endpoint_discovery(self, client, enabled):
366 if enabled == "auto":
367 return client.meta.service_model.endpoint_discovery_required
368 return enabled
370 def _register_eventbridge_events(
371 self, client, endpoint_bridge, endpoint_url
372 ):
373 if client.meta.service_model.service_name != 'events':
374 return
375 EventbridgeSignerSetter(
376 endpoint_resolver=self._endpoint_resolver,
377 region=client.meta.region_name,
378 endpoint_url=endpoint_url,
379 ).register(client.meta.events)
381 def _register_s3express_events(
382 self,
383 client,
384 endpoint_bridge=None,
385 endpoint_url=None,
386 client_config=None,
387 scoped_config=None,
388 ):
389 if client.meta.service_model.service_name != 's3':
390 return
391 S3ExpressIdentityResolver(client, RefreshableCredentials).register()
393 def _register_s3_events(
394 self,
395 client,
396 endpoint_bridge,
397 endpoint_url,
398 client_config,
399 scoped_config,
400 ):
401 if client.meta.service_model.service_name != 's3':
402 return
403 S3RegionRedirectorv2(None, client).register()
404 self._set_s3_presign_signature_version(
405 client.meta, client_config, scoped_config
406 )
407 client.meta.events.register(
408 'before-parameter-build.s3', self._inject_s3_input_parameters
409 )
411 def _register_s3_control_events(
412 self,
413 client,
414 endpoint_bridge=None,
415 endpoint_url=None,
416 client_config=None,
417 scoped_config=None,
418 ):
419 if client.meta.service_model.service_name != 's3control':
420 return
421 S3ControlArnParamHandlerv2().register(client.meta.events)
423 def _set_s3_presign_signature_version(
424 self, client_meta, client_config, scoped_config
425 ):
426 # This will return the manually configured signature version, or None
427 # if none was manually set. If a customer manually sets the signature
428 # version, we always want to use what they set.
429 provided_signature_version = _get_configured_signature_version(
430 's3', client_config, scoped_config
431 )
432 if provided_signature_version is not None:
433 return
435 # Check to see if the region is a region that we know about. If we
436 # don't know about a region, then we can safely assume it's a new
437 # region that is sigv4 only, since all new S3 regions only allow sigv4.
438 # The only exception is aws-global. This is a pseudo-region for the
439 # global endpoint, we should respect the signature versions it
440 # supports, which includes v2.
441 regions = self._endpoint_resolver.get_available_endpoints(
442 's3', client_meta.partition
443 )
444 if (
445 client_meta.region_name != 'aws-global'
446 and client_meta.region_name not in regions
447 ):
448 return
450 # If it is a region we know about, we want to default to sigv2, so here
451 # we check to see if it is available.
452 endpoint = self._endpoint_resolver.construct_endpoint(
453 's3', client_meta.region_name
454 )
455 signature_versions = endpoint['signatureVersions']
456 if 's3' not in signature_versions:
457 return
459 # We now know that we're in a known region that supports sigv2 and
460 # the customer hasn't set a signature version so we default the
461 # signature version to sigv2.
462 client_meta.events.register(
463 'choose-signer.s3', self._default_s3_presign_to_sigv2
464 )
466 def _inject_s3_input_parameters(self, params, context, **kwargs):
467 context['input_params'] = {}
468 inject_parameters = ('Bucket', 'Delete', 'Key', 'Prefix')
469 for inject_parameter in inject_parameters:
470 if inject_parameter in params:
471 context['input_params'][inject_parameter] = params[
472 inject_parameter
473 ]
475 def _default_s3_presign_to_sigv2(self, signature_version, **kwargs):
476 """
477 Returns the 's3' (sigv2) signer if presigning an s3 request. This is
478 intended to be used to set the default signature version for the signer
479 to sigv2. Situations where an asymmetric signature is required are the
480 exception, for example MRAP needs v4a.
482 :type signature_version: str
483 :param signature_version: The current client signature version.
485 :type signing_name: str
486 :param signing_name: The signing name of the service.
488 :return: 's3' if the request is an s3 presign request, None otherwise
489 """
490 if signature_version.startswith('v4a'):
491 return
493 if signature_version.startswith('v4-s3express'):
494 return signature_version
496 for suffix in ['-query', '-presign-post']:
497 if signature_version.endswith(suffix):
498 return f's3{suffix}'
500 def _get_client_args(
501 self,
502 service_model,
503 region_name,
504 is_secure,
505 endpoint_url,
506 verify,
507 credentials,
508 scoped_config,
509 client_config,
510 endpoint_bridge,
511 auth_token,
512 endpoints_ruleset_data,
513 partition_data,
514 ):
515 args_creator = ClientArgsCreator(
516 self._event_emitter,
517 self._user_agent,
518 self._response_parser_factory,
519 self._loader,
520 self._exceptions_factory,
521 config_store=self._config_store,
522 user_agent_creator=self._user_agent_creator,
523 )
524 return args_creator.get_client_args(
525 service_model,
526 region_name,
527 is_secure,
528 endpoint_url,
529 verify,
530 credentials,
531 scoped_config,
532 client_config,
533 endpoint_bridge,
534 auth_token,
535 endpoints_ruleset_data,
536 partition_data,
537 )
539 def _create_methods(self, service_model):
540 op_dict = {}
541 for operation_name in service_model.operation_names:
542 py_operation_name = xform_name(operation_name)
543 op_dict[py_operation_name] = self._create_api_method(
544 py_operation_name, operation_name, service_model
545 )
546 return op_dict
548 def _create_name_mapping(self, service_model):
549 # py_name -> OperationName, for every operation available
550 # for a service.
551 mapping = {}
552 for operation_name in service_model.operation_names:
553 py_operation_name = xform_name(operation_name)
554 mapping[py_operation_name] = operation_name
555 return mapping
557 def _create_api_method(
558 self, py_operation_name, operation_name, service_model
559 ):
560 def _api_call(self, *args, **kwargs):
561 # We're accepting *args so that we can give a more helpful
562 # error message than TypeError: _api_call takes exactly
563 # 1 argument.
564 if args:
565 raise TypeError(
566 f"{py_operation_name}() only accepts keyword arguments."
567 )
568 # The "self" in this scope is referring to the BaseClient.
569 return self._make_api_call(operation_name, kwargs)
571 _api_call.__name__ = str(py_operation_name)
573 # Add the docstring to the client method
574 operation_model = service_model.operation_model(operation_name)
575 docstring = ClientMethodDocstring(
576 operation_model=operation_model,
577 method_name=operation_name,
578 event_emitter=self._event_emitter,
579 method_description=operation_model.documentation,
580 example_prefix=f'response = client.{py_operation_name}',
581 include_signature=False,
582 )
583 _api_call.__doc__ = docstring
584 return _api_call
587class ClientEndpointBridge:
588 """Bridges endpoint data and client creation
590 This class handles taking out the relevant arguments from the endpoint
591 resolver and determining which values to use, taking into account any
592 client configuration options and scope configuration options.
594 This class also handles determining what, if any, region to use if no
595 explicit region setting is provided. For example, Amazon S3 client will
596 utilize "us-east-1" by default if no region can be resolved."""
598 DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com'
599 _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control']
601 def __init__(
602 self,
603 endpoint_resolver,
604 scoped_config=None,
605 client_config=None,
606 default_endpoint=None,
607 service_signing_name=None,
608 config_store=None,
609 service_signature_version=None,
610 ):
611 self.service_signing_name = service_signing_name
612 self.endpoint_resolver = endpoint_resolver
613 self.scoped_config = scoped_config
614 self.client_config = client_config
615 self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT
616 self.config_store = config_store
617 self.service_signature_version = service_signature_version
619 def resolve(
620 self, service_name, region_name=None, endpoint_url=None, is_secure=True
621 ):
622 region_name = self._check_default_region(service_name, region_name)
623 use_dualstack_endpoint = self._resolve_use_dualstack_endpoint(
624 service_name
625 )
626 use_fips_endpoint = self._resolve_endpoint_variant_config_var(
627 'use_fips_endpoint'
628 )
629 resolved = self.endpoint_resolver.construct_endpoint(
630 service_name,
631 region_name,
632 use_dualstack_endpoint=use_dualstack_endpoint,
633 use_fips_endpoint=use_fips_endpoint,
634 )
636 # If we can't resolve the region, we'll attempt to get a global
637 # endpoint for non-regionalized services (iam, route53, etc)
638 if not resolved:
639 # TODO: fallback partition_name should be configurable in the
640 # future for users to define as needed.
641 resolved = self.endpoint_resolver.construct_endpoint(
642 service_name,
643 region_name,
644 partition_name='aws',
645 use_dualstack_endpoint=use_dualstack_endpoint,
646 use_fips_endpoint=use_fips_endpoint,
647 )
649 if resolved:
650 return self._create_endpoint(
651 resolved, service_name, region_name, endpoint_url, is_secure
652 )
653 else:
654 return self._assume_endpoint(
655 service_name, region_name, endpoint_url, is_secure
656 )
658 def resolver_uses_builtin_data(self):
659 return self.endpoint_resolver.uses_builtin_data
661 def _check_default_region(self, service_name, region_name):
662 if region_name is not None:
663 return region_name
664 # Use the client_config region if no explicit region was provided.
665 if self.client_config and self.client_config.region_name is not None:
666 return self.client_config.region_name
668 def _create_endpoint(
669 self, resolved, service_name, region_name, endpoint_url, is_secure
670 ):
671 region_name, signing_region = self._pick_region_values(
672 resolved, region_name, endpoint_url
673 )
674 if endpoint_url is None:
675 endpoint_url = self._make_url(
676 resolved.get('hostname'),
677 is_secure,
678 resolved.get('protocols', []),
679 )
680 signature_version = self._resolve_signature_version(
681 service_name, resolved
682 )
683 signing_name = self._resolve_signing_name(service_name, resolved)
684 return self._create_result(
685 service_name=service_name,
686 region_name=region_name,
687 signing_region=signing_region,
688 signing_name=signing_name,
689 endpoint_url=endpoint_url,
690 metadata=resolved,
691 signature_version=signature_version,
692 )
694 def _resolve_endpoint_variant_config_var(self, config_var):
695 client_config = self.client_config
696 config_val = False
698 # Client configuration arg has precedence
699 if client_config and getattr(client_config, config_var) is not None:
700 return getattr(client_config, config_var)
701 elif self.config_store is not None:
702 # Check config store
703 config_val = self.config_store.get_config_variable(config_var)
704 return config_val
706 def _resolve_use_dualstack_endpoint(self, service_name):
707 s3_dualstack_mode = self._is_s3_dualstack_mode(service_name)
708 if s3_dualstack_mode is not None:
709 return s3_dualstack_mode
710 return self._resolve_endpoint_variant_config_var(
711 'use_dualstack_endpoint'
712 )
714 def _is_s3_dualstack_mode(self, service_name):
715 if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES:
716 return None
717 # TODO: This normalization logic is duplicated from the
718 # ClientArgsCreator class. Consolidate everything to
719 # ClientArgsCreator. _resolve_signature_version also has similarly
720 # duplicated logic.
721 client_config = self.client_config
722 if (
723 client_config is not None
724 and client_config.s3 is not None
725 and 'use_dualstack_endpoint' in client_config.s3
726 ):
727 # Client config trumps scoped config.
728 return client_config.s3['use_dualstack_endpoint']
729 if self.scoped_config is not None:
730 enabled = self.scoped_config.get('s3', {}).get(
731 'use_dualstack_endpoint'
732 )
733 if enabled in [True, 'True', 'true']:
734 return True
736 def _assume_endpoint(
737 self, service_name, region_name, endpoint_url, is_secure
738 ):
739 if endpoint_url is None:
740 # Expand the default hostname URI template.
741 hostname = self.default_endpoint.format(
742 service=service_name, region=region_name
743 )
744 endpoint_url = self._make_url(
745 hostname, is_secure, ['http', 'https']
746 )
747 logger.debug(
748 f'Assuming an endpoint for {service_name}, {region_name}: {endpoint_url}'
749 )
750 # We still want to allow the user to provide an explicit version.
751 signature_version = self._resolve_signature_version(
752 service_name, {'signatureVersions': ['v4']}
753 )
754 signing_name = self._resolve_signing_name(service_name, resolved={})
755 return self._create_result(
756 service_name=service_name,
757 region_name=region_name,
758 signing_region=region_name,
759 signing_name=signing_name,
760 signature_version=signature_version,
761 endpoint_url=endpoint_url,
762 metadata={},
763 )
765 def _create_result(
766 self,
767 service_name,
768 region_name,
769 signing_region,
770 signing_name,
771 endpoint_url,
772 signature_version,
773 metadata,
774 ):
775 return {
776 'service_name': service_name,
777 'region_name': region_name,
778 'signing_region': signing_region,
779 'signing_name': signing_name,
780 'endpoint_url': endpoint_url,
781 'signature_version': signature_version,
782 'metadata': metadata,
783 }
785 def _make_url(self, hostname, is_secure, supported_protocols):
786 if is_secure and 'https' in supported_protocols:
787 scheme = 'https'
788 else:
789 scheme = 'http'
790 return f'{scheme}://{hostname}'
792 def _resolve_signing_name(self, service_name, resolved):
793 # CredentialScope overrides everything else.
794 if (
795 'credentialScope' in resolved
796 and 'service' in resolved['credentialScope']
797 ):
798 return resolved['credentialScope']['service']
799 # Use the signingName from the model if present.
800 if self.service_signing_name:
801 return self.service_signing_name
802 # Just assume is the same as the service name.
803 return service_name
805 def _pick_region_values(self, resolved, region_name, endpoint_url):
806 signing_region = region_name
807 if endpoint_url is None:
808 # Do not use the region name or signing name from the resolved
809 # endpoint if the user explicitly provides an endpoint_url. This
810 # would happen if we resolve to an endpoint where the service has
811 # a "defaults" section that overrides all endpoint with a single
812 # hostname and credentialScope. This has been the case historically
813 # for how STS has worked. The only way to resolve an STS endpoint
814 # was to provide a region_name and an endpoint_url. In that case,
815 # we would still resolve an endpoint, but we would not use the
816 # resolved endpointName or signingRegion because we want to allow
817 # custom endpoints.
818 region_name = resolved['endpointName']
819 signing_region = region_name
820 if (
821 'credentialScope' in resolved
822 and 'region' in resolved['credentialScope']
823 ):
824 signing_region = resolved['credentialScope']['region']
825 return region_name, signing_region
827 def _resolve_signature_version(self, service_name, resolved):
828 configured_version = _get_configured_signature_version(
829 service_name, self.client_config, self.scoped_config
830 )
831 if configured_version is not None:
832 return configured_version
834 potential_versions = resolved.get('signatureVersions', [])
835 if (
836 self.service_signature_version is not None
837 and self.service_signature_version
838 not in _LEGACY_SIGNATURE_VERSIONS
839 ):
840 # Prefer the service model as most specific
841 # source of truth for new signature versions.
842 potential_versions = [self.service_signature_version]
844 # Pick a signature version from the endpoint metadata if present.
845 if 'signatureVersions' in resolved:
846 if service_name == 's3':
847 return 's3v4'
848 if 'v4' in potential_versions:
849 return 'v4'
850 # Now just iterate over the signature versions in order until we
851 # find the first one that is known to Botocore.
852 for known in potential_versions:
853 if known in AUTH_TYPE_MAPS:
854 return known
855 raise UnknownSignatureVersionError(
856 signature_version=potential_versions
857 )
860class BaseClient:
861 # This is actually reassigned with the py->op_name mapping
862 # when the client creator creates the subclass. This value is used
863 # because calls such as client.get_paginator('list_objects') use the
864 # snake_case name, but we need to know the ListObjects form.
865 # xform_name() does the ListObjects->list_objects conversion, but
866 # we need the reverse mapping here.
867 _PY_TO_OP_NAME = {}
869 def __init__(
870 self,
871 serializer,
872 endpoint,
873 response_parser,
874 event_emitter,
875 request_signer,
876 service_model,
877 loader,
878 client_config,
879 partition,
880 exceptions_factory,
881 endpoint_ruleset_resolver=None,
882 user_agent_creator=None,
883 ):
884 self._serializer = serializer
885 self._endpoint = endpoint
886 self._ruleset_resolver = endpoint_ruleset_resolver
887 self._response_parser = response_parser
888 self._request_signer = request_signer
889 self._cache = {}
890 self._loader = loader
891 self._client_config = client_config
892 self.meta = ClientMeta(
893 event_emitter,
894 self._client_config,
895 endpoint.host,
896 service_model,
897 self._PY_TO_OP_NAME,
898 partition,
899 )
900 self._exceptions_factory = exceptions_factory
901 self._exceptions = None
902 self._user_agent_creator = user_agent_creator
903 if self._user_agent_creator is None:
904 self._user_agent_creator = (
905 UserAgentString.from_environment().with_client_config(
906 self._client_config
907 )
908 )
909 self._register_handlers()
911 def __getattr__(self, item):
912 service_id = self._service_model.service_id.hyphenize()
913 event_name = f'getattr.{service_id}.{item}'
915 handler, event_response = self.meta.events.emit_until_response(
916 event_name, client=self
917 )
919 if event_response is not None:
920 return event_response
922 raise AttributeError(
923 f"'{self.__class__.__name__}' object has no attribute '{item}'"
924 )
926 def close(self):
927 """Closes underlying endpoint connections."""
928 self._endpoint.close()
930 def _register_handlers(self):
931 # Register the handler required to sign requests.
932 service_id = self.meta.service_model.service_id.hyphenize()
933 self.meta.events.register(
934 f"request-created.{service_id}", self._request_signer.handler
935 )
937 @property
938 def _service_model(self):
939 return self.meta.service_model
941 def _make_api_call(self, operation_name, api_params):
942 operation_model = self._service_model.operation_model(operation_name)
943 service_name = self._service_model.service_name
944 history_recorder.record(
945 'API_CALL',
946 {
947 'service': service_name,
948 'operation': operation_name,
949 'params': api_params,
950 },
951 )
952 if operation_model.deprecated:
953 logger.debug(
954 'Warning: %s.%s() is deprecated', service_name, operation_name
955 )
956 request_context = {
957 'client_region': self.meta.region_name,
958 'client_config': self.meta.config,
959 'has_streaming_input': operation_model.has_streaming_input,
960 'auth_type': operation_model.resolved_auth_type,
961 'unsigned_payload': operation_model.unsigned_payload,
962 }
964 api_params = self._emit_api_params(
965 api_params=api_params,
966 operation_model=operation_model,
967 context=request_context,
968 )
969 (
970 endpoint_url,
971 additional_headers,
972 properties,
973 ) = self._resolve_endpoint_ruleset(
974 operation_model, api_params, request_context
975 )
976 if properties:
977 # Pass arbitrary endpoint info with the Request
978 # for use during construction.
979 request_context['endpoint_properties'] = properties
980 request_dict = self._convert_to_request_dict(
981 api_params=api_params,
982 operation_model=operation_model,
983 endpoint_url=endpoint_url,
984 context=request_context,
985 headers=additional_headers,
986 )
987 resolve_checksum_context(request_dict, operation_model, api_params)
989 service_id = self._service_model.service_id.hyphenize()
990 handler, event_response = self.meta.events.emit_until_response(
991 f'before-call.{service_id}.{operation_name}',
992 model=operation_model,
993 params=request_dict,
994 request_signer=self._request_signer,
995 context=request_context,
996 )
998 if event_response is not None:
999 http, parsed_response = event_response
1000 else:
1001 maybe_compress_request(
1002 self.meta.config, request_dict, operation_model
1003 )
1004 apply_request_checksum(request_dict)
1005 http, parsed_response = self._make_request(
1006 operation_model, request_dict, request_context
1007 )
1009 self.meta.events.emit(
1010 f'after-call.{service_id}.{operation_name}',
1011 http_response=http,
1012 parsed=parsed_response,
1013 model=operation_model,
1014 context=request_context,
1015 )
1017 if http.status_code >= 300:
1018 error_info = parsed_response.get("Error", {})
1019 error_code = error_info.get("QueryErrorCode") or error_info.get(
1020 "Code"
1021 )
1022 error_class = self.exceptions.from_code(error_code)
1023 raise error_class(parsed_response, operation_name)
1024 else:
1025 return parsed_response
1027 def _make_request(self, operation_model, request_dict, request_context):
1028 try:
1029 return self._endpoint.make_request(operation_model, request_dict)
1030 except Exception as e:
1031 self.meta.events.emit(
1032 f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}',
1033 exception=e,
1034 context=request_context,
1035 )
1036 raise
1038 def _convert_to_request_dict(
1039 self,
1040 api_params,
1041 operation_model,
1042 endpoint_url,
1043 context=None,
1044 headers=None,
1045 set_user_agent_header=True,
1046 ):
1047 request_dict = self._serializer.serialize_to_request(
1048 api_params, operation_model
1049 )
1050 if not self._client_config.inject_host_prefix:
1051 request_dict.pop('host_prefix', None)
1052 if headers is not None:
1053 request_dict['headers'].update(headers)
1054 if set_user_agent_header:
1055 user_agent = self._user_agent_creator.to_string()
1056 else:
1057 user_agent = None
1058 prepare_request_dict(
1059 request_dict,
1060 endpoint_url=endpoint_url,
1061 user_agent=user_agent,
1062 context=context,
1063 )
1064 return request_dict
1066 def _emit_api_params(self, api_params, operation_model, context):
1067 # Given the API params provided by the user and the operation_model
1068 # we can serialize the request to a request_dict.
1069 operation_name = operation_model.name
1071 # Emit an event that allows users to modify the parameters at the
1072 # beginning of the method. It allows handlers to modify existing
1073 # parameters or return a new set of parameters to use.
1074 service_id = self._service_model.service_id.hyphenize()
1075 responses = self.meta.events.emit(
1076 f'provide-client-params.{service_id}.{operation_name}',
1077 params=api_params,
1078 model=operation_model,
1079 context=context,
1080 )
1081 api_params = first_non_none_response(responses, default=api_params)
1083 self.meta.events.emit(
1084 f'before-parameter-build.{service_id}.{operation_name}',
1085 params=api_params,
1086 model=operation_model,
1087 context=context,
1088 )
1089 return api_params
1091 def _resolve_endpoint_ruleset(
1092 self,
1093 operation_model,
1094 params,
1095 request_context,
1096 ignore_signing_region=False,
1097 ):
1098 """Returns endpoint URL and list of additional headers returned from
1099 EndpointRulesetResolver for the given operation and params. If the
1100 ruleset resolver is not available, for example because the service has
1101 no endpoints ruleset file, the legacy endpoint resolver's value is
1102 returned.
1104 Use ignore_signing_region for generating presigned URLs or any other
1105 situation where the signing region information from the ruleset
1106 resolver should be ignored.
1108 Returns tuple of URL and headers dictionary. Additionally, the
1109 request_context dict is modified in place with any signing information
1110 returned from the ruleset resolver.
1111 """
1112 if self._ruleset_resolver is None:
1113 endpoint_url = self.meta.endpoint_url
1114 additional_headers = {}
1115 endpoint_properties = {}
1116 else:
1117 endpoint_info = self._ruleset_resolver.construct_endpoint(
1118 operation_model=operation_model,
1119 call_args=params,
1120 request_context=request_context,
1121 )
1122 endpoint_url = endpoint_info.url
1123 additional_headers = endpoint_info.headers
1124 endpoint_properties = endpoint_info.properties
1125 # If authSchemes is present, overwrite default auth type and
1126 # signing context derived from service model.
1127 auth_schemes = endpoint_info.properties.get('authSchemes')
1128 if auth_schemes is not None:
1129 auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx(
1130 auth_schemes
1131 )
1132 auth_type, signing_context = auth_info
1133 request_context['auth_type'] = auth_type
1134 if 'region' in signing_context and ignore_signing_region:
1135 del signing_context['region']
1136 if 'signing' in request_context:
1137 request_context['signing'].update(signing_context)
1138 else:
1139 request_context['signing'] = signing_context
1141 return endpoint_url, additional_headers, endpoint_properties
1143 def get_paginator(self, operation_name):
1144 """Create a paginator for an operation.
1146 :type operation_name: string
1147 :param operation_name: The operation name. This is the same name
1148 as the method name on the client. For example, if the
1149 method name is ``create_foo``, and you'd normally invoke the
1150 operation as ``client.create_foo(**kwargs)``, if the
1151 ``create_foo`` operation can be paginated, you can use the
1152 call ``client.get_paginator("create_foo")``.
1154 :raise OperationNotPageableError: Raised if the operation is not
1155 pageable. You can use the ``client.can_paginate`` method to
1156 check if an operation is pageable.
1158 :rtype: ``botocore.paginate.Paginator``
1159 :return: A paginator object.
1161 """
1162 if not self.can_paginate(operation_name):
1163 raise OperationNotPageableError(operation_name=operation_name)
1164 else:
1165 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1167 # Create a new paginate method that will serve as a proxy to
1168 # the underlying Paginator.paginate method. This is needed to
1169 # attach a docstring to the method.
1170 def paginate(self, **kwargs):
1171 return Paginator.paginate(self, **kwargs)
1173 paginator_config = self._cache['page_config'][
1174 actual_operation_name
1175 ]
1176 # Add the docstring for the paginate method.
1177 paginate.__doc__ = PaginatorDocstring(
1178 paginator_name=actual_operation_name,
1179 event_emitter=self.meta.events,
1180 service_model=self.meta.service_model,
1181 paginator_config=paginator_config,
1182 include_signature=False,
1183 )
1185 # Rename the paginator class based on the type of paginator.
1186 service_module_name = get_service_module_name(
1187 self.meta.service_model
1188 )
1189 paginator_class_name = (
1190 f"{service_module_name}.Paginator.{actual_operation_name}"
1191 )
1193 # Create the new paginator class
1194 documented_paginator_cls = type(
1195 paginator_class_name, (Paginator,), {'paginate': paginate}
1196 )
1198 operation_model = self._service_model.operation_model(
1199 actual_operation_name
1200 )
1201 paginator = documented_paginator_cls(
1202 getattr(self, operation_name),
1203 paginator_config,
1204 operation_model,
1205 )
1206 return paginator
1208 def can_paginate(self, operation_name):
1209 """Check if an operation can be paginated.
1211 :type operation_name: string
1212 :param operation_name: The operation name. This is the same name
1213 as the method name on the client. For example, if the
1214 method name is ``create_foo``, and you'd normally invoke the
1215 operation as ``client.create_foo(**kwargs)``, if the
1216 ``create_foo`` operation can be paginated, you can use the
1217 call ``client.get_paginator("create_foo")``.
1219 :return: ``True`` if the operation can be paginated,
1220 ``False`` otherwise.
1222 """
1223 if 'page_config' not in self._cache:
1224 try:
1225 page_config = self._loader.load_service_model(
1226 self._service_model.service_name,
1227 'paginators-1',
1228 self._service_model.api_version,
1229 )['pagination']
1230 self._cache['page_config'] = page_config
1231 except DataNotFoundError:
1232 self._cache['page_config'] = {}
1233 actual_operation_name = self._PY_TO_OP_NAME[operation_name]
1234 return actual_operation_name in self._cache['page_config']
1236 def _get_waiter_config(self):
1237 if 'waiter_config' not in self._cache:
1238 try:
1239 waiter_config = self._loader.load_service_model(
1240 self._service_model.service_name,
1241 'waiters-2',
1242 self._service_model.api_version,
1243 )
1244 self._cache['waiter_config'] = waiter_config
1245 except DataNotFoundError:
1246 self._cache['waiter_config'] = {}
1247 return self._cache['waiter_config']
1249 def get_waiter(self, waiter_name):
1250 """Returns an object that can wait for some condition.
1252 :type waiter_name: str
1253 :param waiter_name: The name of the waiter to get. See the waiters
1254 section of the service docs for a list of available waiters.
1256 :returns: The specified waiter object.
1257 :rtype: ``botocore.waiter.Waiter``
1258 """
1259 config = self._get_waiter_config()
1260 if not config:
1261 raise ValueError(f"Waiter does not exist: {waiter_name}")
1262 model = waiter.WaiterModel(config)
1263 mapping = {}
1264 for name in model.waiter_names:
1265 mapping[xform_name(name)] = name
1266 if waiter_name not in mapping:
1267 raise ValueError(f"Waiter does not exist: {waiter_name}")
1269 return waiter.create_waiter_with_client(
1270 mapping[waiter_name], model, self
1271 )
1273 @CachedProperty
1274 def waiter_names(self):
1275 """Returns a list of all available waiters."""
1276 config = self._get_waiter_config()
1277 if not config:
1278 return []
1279 model = waiter.WaiterModel(config)
1280 # Waiter configs is a dict, we just want the waiter names
1281 # which are the keys in the dict.
1282 return [xform_name(name) for name in model.waiter_names]
1284 @property
1285 def exceptions(self):
1286 if self._exceptions is None:
1287 self._exceptions = self._load_exceptions()
1288 return self._exceptions
1290 def _load_exceptions(self):
1291 return self._exceptions_factory.create_client_exceptions(
1292 self._service_model
1293 )
1295 def _get_credentials(self):
1296 """
1297 This private interface is subject to abrupt breaking changes, including
1298 removal, in any botocore release.
1299 """
1300 return self._request_signer._credentials
1303class ClientMeta:
1304 """Holds additional client methods.
1306 This class holds additional information for clients. It exists for
1307 two reasons:
1309 * To give advanced functionality to clients
1310 * To namespace additional client attributes from the operation
1311 names which are mapped to methods at runtime. This avoids
1312 ever running into collisions with operation names.
1314 """
1316 def __init__(
1317 self,
1318 events,
1319 client_config,
1320 endpoint_url,
1321 service_model,
1322 method_to_api_mapping,
1323 partition,
1324 ):
1325 self.events = events
1326 self._client_config = client_config
1327 self._endpoint_url = endpoint_url
1328 self._service_model = service_model
1329 self._method_to_api_mapping = method_to_api_mapping
1330 self._partition = partition
1332 @property
1333 def service_model(self):
1334 return self._service_model
1336 @property
1337 def region_name(self):
1338 return self._client_config.region_name
1340 @property
1341 def endpoint_url(self):
1342 return self._endpoint_url
1344 @property
1345 def config(self):
1346 return self._client_config
1348 @property
1349 def method_to_api_mapping(self):
1350 return self._method_to_api_mapping
1352 @property
1353 def partition(self):
1354 return self._partition
1357def _get_configured_signature_version(
1358 service_name, client_config, scoped_config
1359):
1360 """
1361 Gets the manually configured signature version.
1363 :returns: the customer configured signature version, or None if no
1364 signature version was configured.
1365 """
1366 # Client config overrides everything.
1367 if client_config and client_config.signature_version is not None:
1368 return client_config.signature_version
1370 # Scoped config overrides picking from the endpoint metadata.
1371 if scoped_config is not None:
1372 # A given service may have service specific configuration in the
1373 # config file, so we need to check there as well.
1374 service_config = scoped_config.get(service_name)
1375 if service_config is not None and isinstance(service_config, dict):
1376 version = service_config.get('signature_version')
1377 if version:
1378 logger.debug(
1379 "Switching signature version for service %s "
1380 "to version %s based on config file override.",
1381 service_name,
1382 version,
1383 )
1384 return version
1385 return None