Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/client.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

560 statements  

1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import logging 

14 

15from botocore import ( 

16 UNSIGNED, # noqa: F401 

17 waiter, 

18 xform_name, 

19) 

20from botocore.args import ClientArgsCreator 

21from botocore.auth import AUTH_TYPE_MAPS, resolve_auth_type 

22from botocore.awsrequest import prepare_request_dict 

23from botocore.compress import maybe_compress_request 

24from botocore.config import Config 

25from botocore.context import with_current_context 

26from botocore.credentials import RefreshableCredentials 

27from botocore.discovery import ( 

28 EndpointDiscoveryHandler, 

29 EndpointDiscoveryManager, 

30 block_endpoint_discovery_required_operations, 

31) 

32from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring 

33from botocore.exceptions import ( 

34 ClientError, # noqa: F401 

35 DataNotFoundError, 

36 InvalidEndpointDiscoveryConfigurationError, 

37 OperationNotPageableError, 

38 UnknownServiceError, 

39 UnknownSignatureVersionError, 

40) 

41from botocore.history import get_global_history_recorder 

42from botocore.hooks import first_non_none_response 

43from botocore.httpchecksum import ( 

44 apply_request_checksum, 

45 resolve_checksum_context, 

46) 

47from botocore.model import ServiceModel 

48from botocore.paginate import Paginator 

49from botocore.retries import adaptive, standard 

50from botocore.useragent import UserAgentString, register_feature_id 

51from botocore.utils import ( 

52 CachedProperty, 

53 EventbridgeSignerSetter, 

54 S3ArnParamHandler, # noqa: F401 

55 S3ControlArnParamHandler, # noqa: F401 

56 S3ControlArnParamHandlerv2, 

57 S3ControlEndpointSetter, # noqa: F401 

58 S3EndpointSetter, # noqa: F401 

59 S3ExpressIdentityResolver, 

60 S3RegionRedirector, # noqa: F401 

61 S3RegionRedirectorv2, 

62 ensure_boolean, 

63 get_service_module_name, 

64) 

65 

66logger = logging.getLogger(__name__) 

67history_recorder = get_global_history_recorder() 

68 

69 

70class ClientCreator: 

71 """Creates client objects for a service.""" 

72 

73 def __init__( 

74 self, 

75 loader, 

76 endpoint_resolver, 

77 user_agent, 

78 event_emitter, 

79 retry_handler_factory, 

80 retry_config_translator, 

81 response_parser_factory=None, 

82 exceptions_factory=None, 

83 config_store=None, 

84 user_agent_creator=None, 

85 auth_token_resolver=None, 

86 ): 

87 self._loader = loader 

88 self._endpoint_resolver = endpoint_resolver 

89 self._user_agent = user_agent 

90 self._event_emitter = event_emitter 

91 self._retry_handler_factory = retry_handler_factory 

92 self._retry_config_translator = retry_config_translator 

93 self._response_parser_factory = response_parser_factory 

94 self._exceptions_factory = exceptions_factory 

95 # TODO: Migrate things away from scoped_config in favor of the 

96 # config_store. The config store can pull things from both the scoped 

97 # config and environment variables (and potentially more in the 

98 # future). 

99 self._config_store = config_store 

100 self._user_agent_creator = user_agent_creator 

101 self._auth_token_resolver = auth_token_resolver 

102 

103 def create_client( 

104 self, 

105 service_name, 

106 region_name, 

107 is_secure=True, 

108 endpoint_url=None, 

109 verify=None, 

110 credentials=None, 

111 scoped_config=None, 

112 api_version=None, 

113 client_config=None, 

114 auth_token=None, 

115 ): 

116 responses = self._event_emitter.emit( 

117 'choose-service-name', service_name=service_name 

118 ) 

119 service_name = first_non_none_response(responses, default=service_name) 

120 service_model = self._load_service_model(service_name, api_version) 

121 try: 

122 endpoints_ruleset_data = self._load_service_endpoints_ruleset( 

123 service_name, api_version 

124 ) 

125 partition_data = self._loader.load_data('partitions') 

126 except UnknownServiceError: 

127 endpoints_ruleset_data = None 

128 partition_data = None 

129 logger.info( 

130 'No endpoints ruleset found for service %s, falling back to ' 

131 'legacy endpoint routing.', 

132 service_name, 

133 ) 

134 

135 cls = self._create_client_class(service_name, service_model) 

136 region_name, client_config = self._normalize_fips_region( 

137 region_name, client_config 

138 ) 

139 if auth := service_model.metadata.get('auth'): 

140 service_signature_version = resolve_auth_type(auth) 

141 else: 

142 service_signature_version = service_model.metadata.get( 

143 'signatureVersion' 

144 ) 

145 endpoint_bridge = ClientEndpointBridge( 

146 self._endpoint_resolver, 

147 scoped_config, 

148 client_config, 

149 service_signing_name=service_model.metadata.get('signingName'), 

150 config_store=self._config_store, 

151 service_signature_version=service_signature_version, 

152 ) 

153 if token := self._evaluate_client_specific_token( 

154 service_model.signing_name 

155 ): 

156 auth_token = token 

157 client_args = self._get_client_args( 

158 service_model, 

159 region_name, 

160 is_secure, 

161 endpoint_url, 

162 verify, 

163 credentials, 

164 scoped_config, 

165 client_config, 

166 endpoint_bridge, 

167 auth_token, 

168 endpoints_ruleset_data, 

169 partition_data, 

170 ) 

171 service_client = cls(**client_args) 

172 self._register_retries(service_client) 

173 self._register_s3_events( 

174 client=service_client, 

175 endpoint_bridge=None, 

176 endpoint_url=None, 

177 client_config=client_config, 

178 scoped_config=scoped_config, 

179 ) 

180 self._register_s3express_events(client=service_client) 

181 self._register_s3_control_events(client=service_client) 

182 self._register_importexport_events(client=service_client) 

183 self._register_endpoint_discovery( 

184 service_client, endpoint_url, client_config 

185 ) 

186 return service_client 

187 

188 def create_client_class(self, service_name, api_version=None): 

189 service_model = self._load_service_model(service_name, api_version) 

190 return self._create_client_class(service_name, service_model) 

191 

192 def _create_client_class(self, service_name, service_model): 

193 class_attributes = self._create_methods(service_model) 

194 py_name_to_operation_name = self._create_name_mapping(service_model) 

195 class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name 

196 bases = [BaseClient] 

197 service_id = service_model.service_id.hyphenize() 

198 self._event_emitter.emit( 

199 f'creating-client-class.{service_id}', 

200 class_attributes=class_attributes, 

201 base_classes=bases, 

202 ) 

203 class_name = get_service_module_name(service_model) 

204 cls = type(str(class_name), tuple(bases), class_attributes) 

205 return cls 

206 

207 def _normalize_fips_region(self, region_name, client_config): 

208 if region_name is not None: 

209 normalized_region_name = region_name.replace('fips-', '').replace( 

210 '-fips', '' 

211 ) 

212 # If region has been transformed then set flag 

213 if normalized_region_name != region_name: 

214 config_use_fips_endpoint = Config(use_fips_endpoint=True) 

215 if client_config: 

216 # Keeping endpoint setting client specific 

217 client_config = client_config.merge( 

218 config_use_fips_endpoint 

219 ) 

220 else: 

221 client_config = config_use_fips_endpoint 

222 logger.warning( 

223 f'transforming region from {region_name} to ' 

224 f'{normalized_region_name} and setting ' 

225 'use_fips_endpoint to true. client should not ' 

226 'be configured with a fips psuedo region.' 

227 ) 

228 region_name = normalized_region_name 

229 return region_name, client_config 

230 

231 def _load_service_model(self, service_name, api_version=None): 

232 json_model = self._loader.load_service_model( 

233 service_name, 'service-2', api_version=api_version 

234 ) 

235 service_model = ServiceModel(json_model, service_name=service_name) 

236 return service_model 

237 

238 def _load_service_endpoints_ruleset(self, service_name, api_version=None): 

239 return self._loader.load_service_model( 

240 service_name, 'endpoint-rule-set-1', api_version=api_version 

241 ) 

242 

243 def _register_retries(self, client): 

244 retry_mode = client.meta.config.retries['mode'] 

245 if retry_mode == 'standard': 

246 self._register_v2_standard_retries(client) 

247 elif retry_mode == 'adaptive': 

248 self._register_v2_standard_retries(client) 

249 self._register_v2_adaptive_retries(client) 

250 elif retry_mode == 'legacy': 

251 self._register_legacy_retries(client) 

252 else: 

253 return 

254 register_feature_id(f'RETRY_MODE_{retry_mode.upper()}') 

255 

256 def _register_v2_standard_retries(self, client): 

257 max_attempts = client.meta.config.retries.get('total_max_attempts') 

258 kwargs = {'client': client} 

259 if max_attempts is not None: 

260 kwargs['max_attempts'] = max_attempts 

261 standard.register_retry_handler(**kwargs) 

262 

263 def _register_v2_adaptive_retries(self, client): 

264 adaptive.register_retry_handler(client) 

265 

266 def _register_legacy_retries(self, client): 

267 endpoint_prefix = client.meta.service_model.endpoint_prefix 

268 service_id = client.meta.service_model.service_id 

269 service_event_name = service_id.hyphenize() 

270 

271 # First, we load the entire retry config for all services, 

272 # then pull out just the information we need. 

273 original_config = self._loader.load_data('_retry') 

274 if not original_config: 

275 return 

276 

277 retries = self._transform_legacy_retries(client.meta.config.retries) 

278 retry_config = self._retry_config_translator.build_retry_config( 

279 endpoint_prefix, 

280 original_config.get('retry', {}), 

281 original_config.get('definitions', {}), 

282 retries, 

283 ) 

284 

285 logger.debug( 

286 "Registering retry handlers for service: %s", 

287 client.meta.service_model.service_name, 

288 ) 

289 handler = self._retry_handler_factory.create_retry_handler( 

290 retry_config, endpoint_prefix 

291 ) 

292 unique_id = f'retry-config-{service_event_name}' 

293 client.meta.events.register( 

294 f"needs-retry.{service_event_name}", handler, unique_id=unique_id 

295 ) 

296 

297 def _transform_legacy_retries(self, retries): 

298 if retries is None: 

299 return 

300 copied_args = retries.copy() 

301 if 'total_max_attempts' in retries: 

302 copied_args = retries.copy() 

303 copied_args['max_attempts'] = ( 

304 copied_args.pop('total_max_attempts') - 1 

305 ) 

306 return copied_args 

307 

308 def _get_retry_mode(self, client, config_store): 

309 client_retries = client.meta.config.retries 

310 if ( 

311 client_retries is not None 

312 and client_retries.get('mode') is not None 

313 ): 

314 return client_retries['mode'] 

315 return config_store.get_config_variable('retry_mode') or 'legacy' 

316 

317 def _register_endpoint_discovery(self, client, endpoint_url, config): 

318 if endpoint_url is not None: 

319 # Don't register any handlers in the case of a custom endpoint url 

320 return 

321 # Only attach handlers if the service supports discovery 

322 if client.meta.service_model.endpoint_discovery_operation is None: 

323 return 

324 events = client.meta.events 

325 service_id = client.meta.service_model.service_id.hyphenize() 

326 enabled = False 

327 if config and config.endpoint_discovery_enabled is not None: 

328 enabled = config.endpoint_discovery_enabled 

329 elif self._config_store: 

330 enabled = self._config_store.get_config_variable( 

331 'endpoint_discovery_enabled' 

332 ) 

333 

334 enabled = self._normalize_endpoint_discovery_config(enabled) 

335 if enabled and self._requires_endpoint_discovery(client, enabled): 

336 discover = enabled is True 

337 manager = EndpointDiscoveryManager( 

338 client, always_discover=discover 

339 ) 

340 handler = EndpointDiscoveryHandler(manager) 

341 handler.register(events, service_id) 

342 else: 

343 events.register( 

344 'before-parameter-build', 

345 block_endpoint_discovery_required_operations, 

346 ) 

347 

348 def _normalize_endpoint_discovery_config(self, enabled): 

349 """Config must either be a boolean-string or string-literal 'auto'""" 

350 if isinstance(enabled, str): 

351 enabled = enabled.lower().strip() 

352 if enabled == 'auto': 

353 return enabled 

354 elif enabled in ('true', 'false'): 

355 return ensure_boolean(enabled) 

356 elif isinstance(enabled, bool): 

357 return enabled 

358 

359 raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled) 

360 

361 def _requires_endpoint_discovery(self, client, enabled): 

362 if enabled == "auto": 

363 return client.meta.service_model.endpoint_discovery_required 

364 return enabled 

365 

366 def _register_eventbridge_events( 

367 self, client, endpoint_bridge, endpoint_url 

368 ): 

369 if client.meta.service_model.service_name != 'events': 

370 return 

371 EventbridgeSignerSetter( 

372 endpoint_resolver=self._endpoint_resolver, 

373 region=client.meta.region_name, 

374 endpoint_url=endpoint_url, 

375 ).register(client.meta.events) 

376 

377 def _register_s3express_events( 

378 self, 

379 client, 

380 endpoint_bridge=None, 

381 endpoint_url=None, 

382 client_config=None, 

383 scoped_config=None, 

384 ): 

385 if client.meta.service_model.service_name != 's3': 

386 return 

387 S3ExpressIdentityResolver(client, RefreshableCredentials).register() 

388 

389 def _register_s3_events( 

390 self, 

391 client, 

392 endpoint_bridge, 

393 endpoint_url, 

394 client_config, 

395 scoped_config, 

396 ): 

397 if client.meta.service_model.service_name != 's3': 

398 return 

399 S3RegionRedirectorv2(None, client).register() 

400 self._set_s3_presign_signature_version( 

401 client.meta, client_config, scoped_config 

402 ) 

403 client.meta.events.register( 

404 'before-parameter-build.s3', self._inject_s3_input_parameters 

405 ) 

406 

407 def _register_s3_control_events( 

408 self, 

409 client, 

410 endpoint_bridge=None, 

411 endpoint_url=None, 

412 client_config=None, 

413 scoped_config=None, 

414 ): 

415 if client.meta.service_model.service_name != 's3control': 

416 return 

417 S3ControlArnParamHandlerv2().register(client.meta.events) 

418 

419 def _set_s3_presign_signature_version( 

420 self, client_meta, client_config, scoped_config 

421 ): 

422 # This will return the manually configured signature version, or None 

423 # if none was manually set. If a customer manually sets the signature 

424 # version, we always want to use what they set. 

425 provided_signature_version = _get_configured_signature_version( 

426 's3', client_config, scoped_config 

427 ) 

428 if provided_signature_version is not None: 

429 return 

430 

431 # Check to see if the region is a region that we know about. If we 

432 # don't know about a region, then we can safely assume it's a new 

433 # region that is sigv4 only, since all new S3 regions only allow sigv4. 

434 # The only exception is aws-global. This is a pseudo-region for the 

435 # global endpoint, we should respect the signature versions it 

436 # supports, which includes v2. 

437 regions = self._endpoint_resolver.get_available_endpoints( 

438 's3', client_meta.partition 

439 ) 

440 if ( 

441 client_meta.region_name != 'aws-global' 

442 and client_meta.region_name not in regions 

443 ): 

444 return 

445 

446 # If it is a region we know about, we want to default to sigv2, so here 

447 # we check to see if it is available. 

448 endpoint = self._endpoint_resolver.construct_endpoint( 

449 's3', client_meta.region_name 

450 ) 

451 signature_versions = endpoint['signatureVersions'] 

452 if 's3' not in signature_versions: 

453 return 

454 

455 # We now know that we're in a known region that supports sigv2 and 

456 # the customer hasn't set a signature version so we default the 

457 # signature version to sigv2. 

458 client_meta.events.register( 

459 'choose-signer.s3', self._default_s3_presign_to_sigv2 

460 ) 

461 

462 def _inject_s3_input_parameters(self, params, context, **kwargs): 

463 context['input_params'] = {} 

464 inject_parameters = ('Bucket', 'Delete', 'Key', 'Prefix') 

465 for inject_parameter in inject_parameters: 

466 if inject_parameter in params: 

467 context['input_params'][inject_parameter] = params[ 

468 inject_parameter 

469 ] 

470 

471 def _default_s3_presign_to_sigv2(self, signature_version, **kwargs): 

472 """ 

473 Returns the 's3' (sigv2) signer if presigning an s3 request. This is 

474 intended to be used to set the default signature version for the signer 

475 to sigv2. Situations where an asymmetric signature is required are the 

476 exception, for example MRAP needs v4a. 

477 

478 :type signature_version: str 

479 :param signature_version: The current client signature version. 

480 

481 :type signing_name: str 

482 :param signing_name: The signing name of the service. 

483 

484 :return: 's3' if the request is an s3 presign request, None otherwise 

485 """ 

486 if signature_version.startswith('v4a'): 

487 return 

488 

489 if signature_version.startswith('v4-s3express'): 

490 return signature_version 

491 

492 for suffix in ['-query', '-presign-post']: 

493 if signature_version.endswith(suffix): 

494 return f's3{suffix}' 

495 

496 def _register_importexport_events( 

497 self, 

498 client, 

499 endpoint_bridge=None, 

500 endpoint_url=None, 

501 client_config=None, 

502 scoped_config=None, 

503 ): 

504 if client.meta.service_model.service_name != 'importexport': 

505 return 

506 self._set_importexport_signature_version( 

507 client.meta, client_config, scoped_config 

508 ) 

509 

510 def _set_importexport_signature_version( 

511 self, client_meta, client_config, scoped_config 

512 ): 

513 # This will return the manually configured signature version, or None 

514 # if none was manually set. If a customer manually sets the signature 

515 # version, we always want to use what they set. 

516 configured_signature_version = _get_configured_signature_version( 

517 'importexport', client_config, scoped_config 

518 ) 

519 if configured_signature_version is not None: 

520 return 

521 

522 # importexport has a modeled signatureVersion of v2, but we 

523 # previously switched to v4 via endpoint.json before endpoint rulesets. 

524 # Override the model's signatureVersion for backwards compatability. 

525 client_meta.events.register( 

526 'choose-signer.importexport', self._default_signer_to_sigv4 

527 ) 

528 

529 def _default_signer_to_sigv4(self, signature_version, **kwargs): 

530 return 'v4' 

531 

532 def _get_client_args( 

533 self, 

534 service_model, 

535 region_name, 

536 is_secure, 

537 endpoint_url, 

538 verify, 

539 credentials, 

540 scoped_config, 

541 client_config, 

542 endpoint_bridge, 

543 auth_token, 

544 endpoints_ruleset_data, 

545 partition_data, 

546 ): 

547 args_creator = ClientArgsCreator( 

548 self._event_emitter, 

549 self._user_agent, 

550 self._response_parser_factory, 

551 self._loader, 

552 self._exceptions_factory, 

553 config_store=self._config_store, 

554 user_agent_creator=self._user_agent_creator, 

555 ) 

556 return args_creator.get_client_args( 

557 service_model, 

558 region_name, 

559 is_secure, 

560 endpoint_url, 

561 verify, 

562 credentials, 

563 scoped_config, 

564 client_config, 

565 endpoint_bridge, 

566 auth_token, 

567 endpoints_ruleset_data, 

568 partition_data, 

569 ) 

570 

571 def _create_methods(self, service_model): 

572 op_dict = {} 

573 for operation_name in service_model.operation_names: 

574 py_operation_name = xform_name(operation_name) 

575 op_dict[py_operation_name] = self._create_api_method( 

576 py_operation_name, operation_name, service_model 

577 ) 

578 return op_dict 

579 

580 def _create_name_mapping(self, service_model): 

581 # py_name -> OperationName, for every operation available 

582 # for a service. 

583 mapping = {} 

584 for operation_name in service_model.operation_names: 

585 py_operation_name = xform_name(operation_name) 

586 mapping[py_operation_name] = operation_name 

587 return mapping 

588 

589 def _create_api_method( 

590 self, py_operation_name, operation_name, service_model 

591 ): 

592 def _api_call(self, *args, **kwargs): 

593 # We're accepting *args so that we can give a more helpful 

594 # error message than TypeError: _api_call takes exactly 

595 # 1 argument. 

596 if args: 

597 raise TypeError( 

598 f"{py_operation_name}() only accepts keyword arguments." 

599 ) 

600 # The "self" in this scope is referring to the BaseClient. 

601 return self._make_api_call(operation_name, kwargs) 

602 

603 _api_call.__name__ = str(py_operation_name) 

604 

605 # Add the docstring to the client method 

606 operation_model = service_model.operation_model(operation_name) 

607 docstring = ClientMethodDocstring( 

608 operation_model=operation_model, 

609 method_name=operation_name, 

610 event_emitter=self._event_emitter, 

611 method_description=operation_model.documentation, 

612 example_prefix=f'response = client.{py_operation_name}', 

613 include_signature=False, 

614 ) 

615 _api_call.__doc__ = docstring 

616 return _api_call 

617 

618 def _evaluate_client_specific_token(self, signing_name): 

619 # Resolves an auth_token for the given signing_name. 

620 # Returns None if no resolver is set or if resolution fails. 

621 resolver = self._auth_token_resolver 

622 if not resolver or not signing_name: 

623 return None 

624 

625 return resolver(signing_name=signing_name) 

626 

627 

628class ClientEndpointBridge: 

629 """Bridges endpoint data and client creation 

630 

631 This class handles taking out the relevant arguments from the endpoint 

632 resolver and determining which values to use, taking into account any 

633 client configuration options and scope configuration options. 

634 

635 This class also handles determining what, if any, region to use if no 

636 explicit region setting is provided. For example, Amazon S3 client will 

637 utilize "us-east-1" by default if no region can be resolved.""" 

638 

639 DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com' 

640 _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control'] 

641 

642 def __init__( 

643 self, 

644 endpoint_resolver, 

645 scoped_config=None, 

646 client_config=None, 

647 default_endpoint=None, 

648 service_signing_name=None, 

649 config_store=None, 

650 service_signature_version=None, 

651 ): 

652 self.service_signing_name = service_signing_name 

653 self.endpoint_resolver = endpoint_resolver 

654 self.scoped_config = scoped_config 

655 self.client_config = client_config 

656 self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT 

657 self.config_store = config_store 

658 self.service_signature_version = service_signature_version 

659 

660 def resolve( 

661 self, service_name, region_name=None, endpoint_url=None, is_secure=True 

662 ): 

663 region_name = self._check_default_region(service_name, region_name) 

664 use_dualstack_endpoint = self._resolve_use_dualstack_endpoint( 

665 service_name 

666 ) 

667 use_fips_endpoint = self._resolve_endpoint_variant_config_var( 

668 'use_fips_endpoint' 

669 ) 

670 resolved = self.endpoint_resolver.construct_endpoint( 

671 service_name, 

672 region_name, 

673 use_dualstack_endpoint=use_dualstack_endpoint, 

674 use_fips_endpoint=use_fips_endpoint, 

675 ) 

676 

677 # If we can't resolve the region, we'll attempt to get a global 

678 # endpoint for non-regionalized services (iam, route53, etc) 

679 if not resolved: 

680 # TODO: fallback partition_name should be configurable in the 

681 # future for users to define as needed. 

682 resolved = self.endpoint_resolver.construct_endpoint( 

683 service_name, 

684 region_name, 

685 partition_name='aws', 

686 use_dualstack_endpoint=use_dualstack_endpoint, 

687 use_fips_endpoint=use_fips_endpoint, 

688 ) 

689 

690 if resolved: 

691 return self._create_endpoint( 

692 resolved, service_name, region_name, endpoint_url, is_secure 

693 ) 

694 else: 

695 return self._assume_endpoint( 

696 service_name, region_name, endpoint_url, is_secure 

697 ) 

698 

699 def resolver_uses_builtin_data(self): 

700 return self.endpoint_resolver.uses_builtin_data 

701 

702 def _check_default_region(self, service_name, region_name): 

703 if region_name is not None: 

704 return region_name 

705 # Use the client_config region if no explicit region was provided. 

706 if self.client_config and self.client_config.region_name is not None: 

707 return self.client_config.region_name 

708 

709 def _create_endpoint( 

710 self, resolved, service_name, region_name, endpoint_url, is_secure 

711 ): 

712 region_name, signing_region = self._pick_region_values( 

713 resolved, region_name, endpoint_url 

714 ) 

715 if endpoint_url is None: 

716 endpoint_url = self._make_url( 

717 resolved.get('hostname'), 

718 is_secure, 

719 resolved.get('protocols', []), 

720 ) 

721 signature_version = self._resolve_signature_version( 

722 service_name, resolved 

723 ) 

724 signing_name = self._resolve_signing_name(service_name, resolved) 

725 return self._create_result( 

726 service_name=service_name, 

727 region_name=region_name, 

728 signing_region=signing_region, 

729 signing_name=signing_name, 

730 endpoint_url=endpoint_url, 

731 metadata=resolved, 

732 signature_version=signature_version, 

733 ) 

734 

735 def _resolve_endpoint_variant_config_var(self, config_var): 

736 client_config = self.client_config 

737 config_val = False 

738 

739 # Client configuration arg has precedence 

740 if client_config and getattr(client_config, config_var) is not None: 

741 return getattr(client_config, config_var) 

742 elif self.config_store is not None: 

743 # Check config store 

744 config_val = self.config_store.get_config_variable(config_var) 

745 return config_val 

746 

747 def _resolve_use_dualstack_endpoint(self, service_name): 

748 s3_dualstack_mode = self._is_s3_dualstack_mode(service_name) 

749 if s3_dualstack_mode is not None: 

750 return s3_dualstack_mode 

751 return self._resolve_endpoint_variant_config_var( 

752 'use_dualstack_endpoint' 

753 ) 

754 

755 def _is_s3_dualstack_mode(self, service_name): 

756 if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES: 

757 return None 

758 # TODO: This normalization logic is duplicated from the 

759 # ClientArgsCreator class. Consolidate everything to 

760 # ClientArgsCreator. _resolve_signature_version also has similarly 

761 # duplicated logic. 

762 client_config = self.client_config 

763 if ( 

764 client_config is not None 

765 and client_config.s3 is not None 

766 and 'use_dualstack_endpoint' in client_config.s3 

767 ): 

768 # Client config trumps scoped config. 

769 return client_config.s3['use_dualstack_endpoint'] 

770 if self.scoped_config is not None: 

771 enabled = self.scoped_config.get('s3', {}).get( 

772 'use_dualstack_endpoint' 

773 ) 

774 if enabled in [True, 'True', 'true']: 

775 return True 

776 

777 def _assume_endpoint( 

778 self, service_name, region_name, endpoint_url, is_secure 

779 ): 

780 if endpoint_url is None: 

781 # Expand the default hostname URI template. 

782 hostname = self.default_endpoint.format( 

783 service=service_name, region=region_name 

784 ) 

785 endpoint_url = self._make_url( 

786 hostname, is_secure, ['http', 'https'] 

787 ) 

788 logger.debug( 

789 f'Assuming an endpoint for {service_name}, {region_name}: {endpoint_url}' 

790 ) 

791 # We still want to allow the user to provide an explicit version. 

792 signature_version = self._resolve_signature_version( 

793 service_name, {'signatureVersions': ['v4']} 

794 ) 

795 signing_name = self._resolve_signing_name(service_name, resolved={}) 

796 return self._create_result( 

797 service_name=service_name, 

798 region_name=region_name, 

799 signing_region=region_name, 

800 signing_name=signing_name, 

801 signature_version=signature_version, 

802 endpoint_url=endpoint_url, 

803 metadata={}, 

804 ) 

805 

806 def _create_result( 

807 self, 

808 service_name, 

809 region_name, 

810 signing_region, 

811 signing_name, 

812 endpoint_url, 

813 signature_version, 

814 metadata, 

815 ): 

816 return { 

817 'service_name': service_name, 

818 'region_name': region_name, 

819 'signing_region': signing_region, 

820 'signing_name': signing_name, 

821 'endpoint_url': endpoint_url, 

822 'signature_version': signature_version, 

823 'metadata': metadata, 

824 } 

825 

826 def _make_url(self, hostname, is_secure, supported_protocols): 

827 if is_secure and 'https' in supported_protocols: 

828 scheme = 'https' 

829 else: 

830 scheme = 'http' 

831 return f'{scheme}://{hostname}' 

832 

833 def _resolve_signing_name(self, service_name, resolved): 

834 # CredentialScope overrides everything else. 

835 if ( 

836 'credentialScope' in resolved 

837 and 'service' in resolved['credentialScope'] 

838 ): 

839 return resolved['credentialScope']['service'] 

840 # Use the signingName from the model if present. 

841 if self.service_signing_name: 

842 return self.service_signing_name 

843 # Just assume is the same as the service name. 

844 return service_name 

845 

846 def _pick_region_values(self, resolved, region_name, endpoint_url): 

847 signing_region = region_name 

848 if endpoint_url is None: 

849 # Do not use the region name or signing name from the resolved 

850 # endpoint if the user explicitly provides an endpoint_url. This 

851 # would happen if we resolve to an endpoint where the service has 

852 # a "defaults" section that overrides all endpoint with a single 

853 # hostname and credentialScope. This has been the case historically 

854 # for how STS has worked. The only way to resolve an STS endpoint 

855 # was to provide a region_name and an endpoint_url. In that case, 

856 # we would still resolve an endpoint, but we would not use the 

857 # resolved endpointName or signingRegion because we want to allow 

858 # custom endpoints. 

859 region_name = resolved['endpointName'] 

860 signing_region = region_name 

861 if ( 

862 'credentialScope' in resolved 

863 and 'region' in resolved['credentialScope'] 

864 ): 

865 signing_region = resolved['credentialScope']['region'] 

866 return region_name, signing_region 

867 

868 def _resolve_signature_version(self, service_name, resolved): 

869 configured_version = _get_configured_signature_version( 

870 service_name, self.client_config, self.scoped_config 

871 ) 

872 if configured_version is not None: 

873 return configured_version 

874 

875 # These have since added the "auth" key to the service model 

876 # with "aws.auth#sigv4", but preserve existing behavior from 

877 # when we preferred endpoints.json over the service models 

878 if service_name in ('s3', 's3-control'): 

879 return 's3v4' 

880 

881 if self.service_signature_version is not None: 

882 # Prefer the service model 

883 potential_versions = [self.service_signature_version] 

884 else: 

885 # Fall back to endpoints.json to preserve existing behavior, which 

886 # may be useful for users who have custom service models 

887 potential_versions = resolved.get('signatureVersions', []) 

888 # This was added for the V2 -> V4 transition, 

889 # for services that added V4 after V2 in endpoints.json 

890 if 'v4' in potential_versions: 

891 return 'v4' 

892 # Now just iterate over the signature versions in order until we 

893 # find the first one that is known to Botocore. 

894 for known in potential_versions: 

895 if known in AUTH_TYPE_MAPS: 

896 return known 

897 

898 raise UnknownSignatureVersionError( 

899 signature_version=potential_versions 

900 ) 

901 

902 

903class BaseClient: 

904 # This is actually reassigned with the py->op_name mapping 

905 # when the client creator creates the subclass. This value is used 

906 # because calls such as client.get_paginator('list_objects') use the 

907 # snake_case name, but we need to know the ListObjects form. 

908 # xform_name() does the ListObjects->list_objects conversion, but 

909 # we need the reverse mapping here. 

910 _PY_TO_OP_NAME = {} 

911 

912 def __init__( 

913 self, 

914 serializer, 

915 endpoint, 

916 response_parser, 

917 event_emitter, 

918 request_signer, 

919 service_model, 

920 loader, 

921 client_config, 

922 partition, 

923 exceptions_factory, 

924 endpoint_ruleset_resolver=None, 

925 user_agent_creator=None, 

926 ): 

927 self._serializer = serializer 

928 self._endpoint = endpoint 

929 self._ruleset_resolver = endpoint_ruleset_resolver 

930 self._response_parser = response_parser 

931 self._request_signer = request_signer 

932 self._cache = {} 

933 self._loader = loader 

934 self._client_config = client_config 

935 self.meta = ClientMeta( 

936 event_emitter, 

937 self._client_config, 

938 endpoint.host, 

939 service_model, 

940 self._PY_TO_OP_NAME, 

941 partition, 

942 ) 

943 self._exceptions_factory = exceptions_factory 

944 self._exceptions = None 

945 self._user_agent_creator = user_agent_creator 

946 if self._user_agent_creator is None: 

947 self._user_agent_creator = ( 

948 UserAgentString.from_environment().with_client_config( 

949 self._client_config 

950 ) 

951 ) 

952 self._register_handlers() 

953 

954 def __getattr__(self, item): 

955 service_id = self._service_model.service_id.hyphenize() 

956 event_name = f'getattr.{service_id}.{item}' 

957 

958 handler, event_response = self.meta.events.emit_until_response( 

959 event_name, client=self 

960 ) 

961 

962 if event_response is not None: 

963 return event_response 

964 

965 raise AttributeError( 

966 f"'{self.__class__.__name__}' object has no attribute '{item}'" 

967 ) 

968 

969 def close(self): 

970 """Closes underlying endpoint connections.""" 

971 self._endpoint.close() 

972 

973 def _register_handlers(self): 

974 # Register the handler required to sign requests. 

975 service_id = self.meta.service_model.service_id.hyphenize() 

976 self.meta.events.register( 

977 f"request-created.{service_id}", self._request_signer.handler 

978 ) 

979 # Rebuild user agent string right before request is sent 

980 # to ensure all registered features are included. 

981 self.meta.events.register_last( 

982 f"request-created.{service_id}", 

983 self._user_agent_creator.rebuild_and_replace_user_agent_handler, 

984 ) 

985 

986 @property 

987 def _service_model(self): 

988 return self.meta.service_model 

989 

990 @with_current_context() 

991 def _make_api_call(self, operation_name, api_params): 

992 operation_model = self._service_model.operation_model(operation_name) 

993 service_name = self._service_model.service_name 

994 history_recorder.record( 

995 'API_CALL', 

996 { 

997 'service': service_name, 

998 'operation': operation_name, 

999 'params': api_params, 

1000 }, 

1001 ) 

1002 if operation_model.deprecated: 

1003 logger.debug( 

1004 'Warning: %s.%s() is deprecated', service_name, operation_name 

1005 ) 

1006 request_context = { 

1007 'client_region': self.meta.region_name, 

1008 'client_config': self.meta.config, 

1009 'has_streaming_input': operation_model.has_streaming_input, 

1010 'auth_type': operation_model.resolved_auth_type, 

1011 'unsigned_payload': operation_model.unsigned_payload, 

1012 'auth_options': self._service_model.metadata.get('auth'), 

1013 } 

1014 

1015 api_params = self._emit_api_params( 

1016 api_params=api_params, 

1017 operation_model=operation_model, 

1018 context=request_context, 

1019 ) 

1020 ( 

1021 endpoint_url, 

1022 additional_headers, 

1023 properties, 

1024 ) = self._resolve_endpoint_ruleset( 

1025 operation_model, api_params, request_context 

1026 ) 

1027 if properties: 

1028 # Pass arbitrary endpoint info with the Request 

1029 # for use during construction. 

1030 request_context['endpoint_properties'] = properties 

1031 request_dict = self._convert_to_request_dict( 

1032 api_params=api_params, 

1033 operation_model=operation_model, 

1034 endpoint_url=endpoint_url, 

1035 context=request_context, 

1036 headers=additional_headers, 

1037 ) 

1038 resolve_checksum_context(request_dict, operation_model, api_params) 

1039 

1040 service_id = self._service_model.service_id.hyphenize() 

1041 handler, event_response = self.meta.events.emit_until_response( 

1042 f'before-call.{service_id}.{operation_name}', 

1043 model=operation_model, 

1044 params=request_dict, 

1045 request_signer=self._request_signer, 

1046 context=request_context, 

1047 ) 

1048 

1049 if event_response is not None: 

1050 http, parsed_response = event_response 

1051 else: 

1052 maybe_compress_request( 

1053 self.meta.config, request_dict, operation_model 

1054 ) 

1055 apply_request_checksum(request_dict) 

1056 http, parsed_response = self._make_request( 

1057 operation_model, request_dict, request_context 

1058 ) 

1059 

1060 self.meta.events.emit( 

1061 f'after-call.{service_id}.{operation_name}', 

1062 http_response=http, 

1063 parsed=parsed_response, 

1064 model=operation_model, 

1065 context=request_context, 

1066 ) 

1067 

1068 if http.status_code >= 300: 

1069 error_info = parsed_response.get("Error", {}) 

1070 error_code = error_info.get("QueryErrorCode") or error_info.get( 

1071 "Code" 

1072 ) 

1073 error_class = self.exceptions.from_code(error_code) 

1074 raise error_class(parsed_response, operation_name) 

1075 else: 

1076 return parsed_response 

1077 

1078 def _make_request(self, operation_model, request_dict, request_context): 

1079 try: 

1080 return self._endpoint.make_request(operation_model, request_dict) 

1081 except Exception as e: 

1082 self.meta.events.emit( 

1083 f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}', 

1084 exception=e, 

1085 context=request_context, 

1086 ) 

1087 raise 

1088 

1089 def _convert_to_request_dict( 

1090 self, 

1091 api_params, 

1092 operation_model, 

1093 endpoint_url, 

1094 context=None, 

1095 headers=None, 

1096 set_user_agent_header=True, 

1097 ): 

1098 request_dict = self._serializer.serialize_to_request( 

1099 api_params, operation_model 

1100 ) 

1101 if not self._client_config.inject_host_prefix: 

1102 request_dict.pop('host_prefix', None) 

1103 if headers is not None: 

1104 request_dict['headers'].update(headers) 

1105 if set_user_agent_header: 

1106 user_agent = self._user_agent_creator.to_string() 

1107 else: 

1108 user_agent = None 

1109 prepare_request_dict( 

1110 request_dict, 

1111 endpoint_url=endpoint_url, 

1112 user_agent=user_agent, 

1113 context=context, 

1114 ) 

1115 return request_dict 

1116 

1117 def _emit_api_params(self, api_params, operation_model, context): 

1118 # Given the API params provided by the user and the operation_model 

1119 # we can serialize the request to a request_dict. 

1120 operation_name = operation_model.name 

1121 

1122 # Emit an event that allows users to modify the parameters at the 

1123 # beginning of the method. It allows handlers to modify existing 

1124 # parameters or return a new set of parameters to use. 

1125 service_id = self._service_model.service_id.hyphenize() 

1126 responses = self.meta.events.emit( 

1127 f'provide-client-params.{service_id}.{operation_name}', 

1128 params=api_params, 

1129 model=operation_model, 

1130 context=context, 

1131 ) 

1132 api_params = first_non_none_response(responses, default=api_params) 

1133 

1134 self.meta.events.emit( 

1135 f'before-parameter-build.{service_id}.{operation_name}', 

1136 params=api_params, 

1137 model=operation_model, 

1138 context=context, 

1139 ) 

1140 return api_params 

1141 

1142 def _resolve_endpoint_ruleset( 

1143 self, 

1144 operation_model, 

1145 params, 

1146 request_context, 

1147 ignore_signing_region=False, 

1148 ): 

1149 """Returns endpoint URL and list of additional headers returned from 

1150 EndpointRulesetResolver for the given operation and params. If the 

1151 ruleset resolver is not available, for example because the service has 

1152 no endpoints ruleset file, the legacy endpoint resolver's value is 

1153 returned. 

1154 

1155 Use ignore_signing_region for generating presigned URLs or any other 

1156 situation where the signing region information from the ruleset 

1157 resolver should be ignored. 

1158 

1159 Returns tuple of URL and headers dictionary. Additionally, the 

1160 request_context dict is modified in place with any signing information 

1161 returned from the ruleset resolver. 

1162 """ 

1163 if self._ruleset_resolver is None: 

1164 endpoint_url = self.meta.endpoint_url 

1165 additional_headers = {} 

1166 endpoint_properties = {} 

1167 else: 

1168 endpoint_info = self._ruleset_resolver.construct_endpoint( 

1169 operation_model=operation_model, 

1170 call_args=params, 

1171 request_context=request_context, 

1172 ) 

1173 endpoint_url = endpoint_info.url 

1174 additional_headers = endpoint_info.headers 

1175 endpoint_properties = endpoint_info.properties 

1176 # If authSchemes is present, overwrite default auth type and 

1177 # signing context derived from service model. 

1178 auth_schemes = endpoint_info.properties.get('authSchemes') 

1179 if auth_schemes is not None: 

1180 auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx( 

1181 auth_schemes 

1182 ) 

1183 auth_type, signing_context = auth_info 

1184 request_context['auth_type'] = auth_type 

1185 if 'region' in signing_context and ignore_signing_region: 

1186 del signing_context['region'] 

1187 if 'signing' in request_context: 

1188 request_context['signing'].update(signing_context) 

1189 else: 

1190 request_context['signing'] = signing_context 

1191 

1192 return endpoint_url, additional_headers, endpoint_properties 

1193 

1194 def get_paginator(self, operation_name): 

1195 """Create a paginator for an operation. 

1196 

1197 :type operation_name: string 

1198 :param operation_name: The operation name. This is the same name 

1199 as the method name on the client. For example, if the 

1200 method name is ``create_foo``, and you'd normally invoke the 

1201 operation as ``client.create_foo(**kwargs)``, if the 

1202 ``create_foo`` operation can be paginated, you can use the 

1203 call ``client.get_paginator("create_foo")``. 

1204 

1205 :raise OperationNotPageableError: Raised if the operation is not 

1206 pageable. You can use the ``client.can_paginate`` method to 

1207 check if an operation is pageable. 

1208 

1209 :rtype: ``botocore.paginate.Paginator`` 

1210 :return: A paginator object. 

1211 

1212 """ 

1213 if not self.can_paginate(operation_name): 

1214 raise OperationNotPageableError(operation_name=operation_name) 

1215 else: 

1216 actual_operation_name = self._PY_TO_OP_NAME[operation_name] 

1217 

1218 # Create a new paginate method that will serve as a proxy to 

1219 # the underlying Paginator.paginate method. This is needed to 

1220 # attach a docstring to the method. 

1221 def paginate(self, **kwargs): 

1222 return Paginator.paginate(self, **kwargs) 

1223 

1224 paginator_config = self._cache['page_config'][ 

1225 actual_operation_name 

1226 ] 

1227 # Add the docstring for the paginate method. 

1228 paginate.__doc__ = PaginatorDocstring( 

1229 paginator_name=actual_operation_name, 

1230 event_emitter=self.meta.events, 

1231 service_model=self.meta.service_model, 

1232 paginator_config=paginator_config, 

1233 include_signature=False, 

1234 ) 

1235 

1236 # Rename the paginator class based on the type of paginator. 

1237 service_module_name = get_service_module_name( 

1238 self.meta.service_model 

1239 ) 

1240 paginator_class_name = ( 

1241 f"{service_module_name}.Paginator.{actual_operation_name}" 

1242 ) 

1243 

1244 # Create the new paginator class 

1245 documented_paginator_cls = type( 

1246 paginator_class_name, (Paginator,), {'paginate': paginate} 

1247 ) 

1248 

1249 operation_model = self._service_model.operation_model( 

1250 actual_operation_name 

1251 ) 

1252 paginator = documented_paginator_cls( 

1253 getattr(self, operation_name), 

1254 paginator_config, 

1255 operation_model, 

1256 ) 

1257 return paginator 

1258 

1259 def can_paginate(self, operation_name): 

1260 """Check if an operation can be paginated. 

1261 

1262 :type operation_name: string 

1263 :param operation_name: The operation name. This is the same name 

1264 as the method name on the client. For example, if the 

1265 method name is ``create_foo``, and you'd normally invoke the 

1266 operation as ``client.create_foo(**kwargs)``, if the 

1267 ``create_foo`` operation can be paginated, you can use the 

1268 call ``client.get_paginator("create_foo")``. 

1269 

1270 :return: ``True`` if the operation can be paginated, 

1271 ``False`` otherwise. 

1272 

1273 """ 

1274 if 'page_config' not in self._cache: 

1275 try: 

1276 page_config = self._loader.load_service_model( 

1277 self._service_model.service_name, 

1278 'paginators-1', 

1279 self._service_model.api_version, 

1280 )['pagination'] 

1281 self._cache['page_config'] = page_config 

1282 except DataNotFoundError: 

1283 self._cache['page_config'] = {} 

1284 actual_operation_name = self._PY_TO_OP_NAME[operation_name] 

1285 return actual_operation_name in self._cache['page_config'] 

1286 

1287 def _get_waiter_config(self): 

1288 if 'waiter_config' not in self._cache: 

1289 try: 

1290 waiter_config = self._loader.load_service_model( 

1291 self._service_model.service_name, 

1292 'waiters-2', 

1293 self._service_model.api_version, 

1294 ) 

1295 self._cache['waiter_config'] = waiter_config 

1296 except DataNotFoundError: 

1297 self._cache['waiter_config'] = {} 

1298 return self._cache['waiter_config'] 

1299 

1300 def get_waiter(self, waiter_name): 

1301 """Returns an object that can wait for some condition. 

1302 

1303 :type waiter_name: str 

1304 :param waiter_name: The name of the waiter to get. See the waiters 

1305 section of the service docs for a list of available waiters. 

1306 

1307 :returns: The specified waiter object. 

1308 :rtype: ``botocore.waiter.Waiter`` 

1309 """ 

1310 config = self._get_waiter_config() 

1311 if not config: 

1312 raise ValueError(f"Waiter does not exist: {waiter_name}") 

1313 model = waiter.WaiterModel(config) 

1314 mapping = {} 

1315 for name in model.waiter_names: 

1316 mapping[xform_name(name)] = name 

1317 if waiter_name not in mapping: 

1318 raise ValueError(f"Waiter does not exist: {waiter_name}") 

1319 

1320 return waiter.create_waiter_with_client( 

1321 mapping[waiter_name], model, self 

1322 ) 

1323 

1324 @CachedProperty 

1325 def waiter_names(self): 

1326 """Returns a list of all available waiters.""" 

1327 config = self._get_waiter_config() 

1328 if not config: 

1329 return [] 

1330 model = waiter.WaiterModel(config) 

1331 # Waiter configs is a dict, we just want the waiter names 

1332 # which are the keys in the dict. 

1333 return [xform_name(name) for name in model.waiter_names] 

1334 

1335 @property 

1336 def exceptions(self): 

1337 if self._exceptions is None: 

1338 self._exceptions = self._load_exceptions() 

1339 return self._exceptions 

1340 

1341 def _load_exceptions(self): 

1342 return self._exceptions_factory.create_client_exceptions( 

1343 self._service_model 

1344 ) 

1345 

1346 def _get_credentials(self): 

1347 """ 

1348 This private interface is subject to abrupt breaking changes, including 

1349 removal, in any botocore release. 

1350 """ 

1351 return self._request_signer._credentials 

1352 

1353 

1354class ClientMeta: 

1355 """Holds additional client methods. 

1356 

1357 This class holds additional information for clients. It exists for 

1358 two reasons: 

1359 

1360 * To give advanced functionality to clients 

1361 * To namespace additional client attributes from the operation 

1362 names which are mapped to methods at runtime. This avoids 

1363 ever running into collisions with operation names. 

1364 

1365 """ 

1366 

1367 def __init__( 

1368 self, 

1369 events, 

1370 client_config, 

1371 endpoint_url, 

1372 service_model, 

1373 method_to_api_mapping, 

1374 partition, 

1375 ): 

1376 self.events = events 

1377 self._client_config = client_config 

1378 self._endpoint_url = endpoint_url 

1379 self._service_model = service_model 

1380 self._method_to_api_mapping = method_to_api_mapping 

1381 self._partition = partition 

1382 

1383 @property 

1384 def service_model(self): 

1385 return self._service_model 

1386 

1387 @property 

1388 def region_name(self): 

1389 return self._client_config.region_name 

1390 

1391 @property 

1392 def endpoint_url(self): 

1393 return self._endpoint_url 

1394 

1395 @property 

1396 def config(self): 

1397 return self._client_config 

1398 

1399 @property 

1400 def method_to_api_mapping(self): 

1401 return self._method_to_api_mapping 

1402 

1403 @property 

1404 def partition(self): 

1405 return self._partition 

1406 

1407 

1408def _get_configured_signature_version( 

1409 service_name, client_config, scoped_config 

1410): 

1411 """ 

1412 Gets the manually configured signature version. 

1413 

1414 :returns: the customer configured signature version, or None if no 

1415 signature version was configured. 

1416 """ 

1417 # Client config overrides everything. 

1418 if client_config and client_config.signature_version is not None: 

1419 return client_config.signature_version 

1420 

1421 # Scoped config overrides picking from the endpoint metadata. 

1422 if scoped_config is not None: 

1423 # A given service may have service specific configuration in the 

1424 # config file, so we need to check there as well. 

1425 service_config = scoped_config.get(service_name) 

1426 if service_config is not None and isinstance(service_config, dict): 

1427 version = service_config.get('signature_version') 

1428 if version: 

1429 logger.debug( 

1430 "Switching signature version for service %s " 

1431 "to version %s based on config file override.", 

1432 service_name, 

1433 version, 

1434 ) 

1435 return version 

1436 return None