Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/client.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

560 statements  

1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import logging 

14 

15from botocore import ( 

16 UNSIGNED, # noqa: F401 

17 waiter, 

18 xform_name, 

19) 

20from botocore.args import ClientArgsCreator 

21from botocore.auth import AUTH_TYPE_MAPS, resolve_auth_type 

22from botocore.awsrequest import prepare_request_dict 

23from botocore.compress import maybe_compress_request 

24from botocore.config import Config 

25from botocore.context import with_current_context 

26from botocore.credentials import RefreshableCredentials 

27from botocore.discovery import ( 

28 EndpointDiscoveryHandler, 

29 EndpointDiscoveryManager, 

30 block_endpoint_discovery_required_operations, 

31) 

32from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring 

33from botocore.exceptions import ( 

34 ClientError, # noqa: F401 

35 DataNotFoundError, 

36 InvalidEndpointDiscoveryConfigurationError, 

37 OperationNotPageableError, 

38 UnknownServiceError, 

39 UnknownSignatureVersionError, 

40) 

41from botocore.history import get_global_history_recorder 

42from botocore.hooks import first_non_none_response 

43from botocore.httpchecksum import ( 

44 apply_request_checksum, 

45 resolve_checksum_context, 

46) 

47from botocore.model import ServiceModel 

48from botocore.paginate import Paginator 

49from botocore.retries import adaptive, standard 

50from botocore.useragent import UserAgentString, register_feature_id 

51from botocore.utils import ( 

52 CachedProperty, 

53 EventbridgeSignerSetter, 

54 S3ArnParamHandler, # noqa: F401 

55 S3ControlArnParamHandler, # noqa: F401 

56 S3ControlArnParamHandlerv2, 

57 S3ControlEndpointSetter, # noqa: F401 

58 S3EndpointSetter, # noqa: F401 

59 S3ExpressIdentityResolver, 

60 S3RegionRedirector, # noqa: F401 

61 S3RegionRedirectorv2, 

62 ensure_boolean, 

63 get_service_module_name, 

64) 

65 

66logger = logging.getLogger(__name__) 

67history_recorder = get_global_history_recorder() 

68 

69 

70class ClientCreator: 

71 """Creates client objects for a service.""" 

72 

73 def __init__( 

74 self, 

75 loader, 

76 endpoint_resolver, 

77 user_agent, 

78 event_emitter, 

79 retry_handler_factory, 

80 retry_config_translator, 

81 response_parser_factory=None, 

82 exceptions_factory=None, 

83 config_store=None, 

84 user_agent_creator=None, 

85 auth_token_resolver=None, 

86 ): 

87 self._loader = loader 

88 self._endpoint_resolver = endpoint_resolver 

89 self._user_agent = user_agent 

90 self._event_emitter = event_emitter 

91 self._retry_handler_factory = retry_handler_factory 

92 self._retry_config_translator = retry_config_translator 

93 self._response_parser_factory = response_parser_factory 

94 self._exceptions_factory = exceptions_factory 

95 # TODO: Migrate things away from scoped_config in favor of the 

96 # config_store. The config store can pull things from both the scoped 

97 # config and environment variables (and potentially more in the 

98 # future). 

99 self._config_store = config_store 

100 self._user_agent_creator = user_agent_creator 

101 self._auth_token_resolver = auth_token_resolver 

102 

103 def create_client( 

104 self, 

105 service_name, 

106 region_name, 

107 is_secure=True, 

108 endpoint_url=None, 

109 verify=None, 

110 credentials=None, 

111 scoped_config=None, 

112 api_version=None, 

113 client_config=None, 

114 auth_token=None, 

115 ): 

116 responses = self._event_emitter.emit( 

117 'choose-service-name', service_name=service_name 

118 ) 

119 service_name = first_non_none_response(responses, default=service_name) 

120 service_model = self._load_service_model(service_name, api_version) 

121 try: 

122 endpoints_ruleset_data = self._load_service_endpoints_ruleset( 

123 service_name, api_version 

124 ) 

125 partition_data = self._loader.load_data('partitions') 

126 except UnknownServiceError: 

127 endpoints_ruleset_data = None 

128 partition_data = None 

129 logger.info( 

130 'No endpoints ruleset found for service %s, falling back to ' 

131 'legacy endpoint routing.', 

132 service_name, 

133 ) 

134 

135 cls = self._create_client_class(service_name, service_model) 

136 region_name, client_config = self._normalize_fips_region( 

137 region_name, client_config 

138 ) 

139 if auth := service_model.metadata.get('auth'): 

140 service_signature_version = resolve_auth_type(auth) 

141 else: 

142 service_signature_version = service_model.metadata.get( 

143 'signatureVersion' 

144 ) 

145 endpoint_bridge = ClientEndpointBridge( 

146 self._endpoint_resolver, 

147 scoped_config, 

148 client_config, 

149 service_signing_name=service_model.metadata.get('signingName'), 

150 config_store=self._config_store, 

151 service_signature_version=service_signature_version, 

152 ) 

153 if token := self._evaluate_client_specific_token( 

154 service_model.signing_name 

155 ): 

156 auth_token = token 

157 client_args = self._get_client_args( 

158 service_model, 

159 region_name, 

160 is_secure, 

161 endpoint_url, 

162 verify, 

163 credentials, 

164 scoped_config, 

165 client_config, 

166 endpoint_bridge, 

167 auth_token, 

168 endpoints_ruleset_data, 

169 partition_data, 

170 ) 

171 service_client = cls(**client_args) 

172 self._register_retries(service_client) 

173 self._register_s3_events( 

174 client=service_client, 

175 endpoint_bridge=None, 

176 endpoint_url=None, 

177 client_config=client_config, 

178 scoped_config=scoped_config, 

179 ) 

180 self._register_s3express_events(client=service_client) 

181 self._register_s3_control_events(client=service_client) 

182 self._register_importexport_events(client=service_client) 

183 self._register_endpoint_discovery( 

184 service_client, endpoint_url, client_config 

185 ) 

186 return service_client 

187 

188 def create_client_class(self, service_name, api_version=None): 

189 service_model = self._load_service_model(service_name, api_version) 

190 return self._create_client_class(service_name, service_model) 

191 

192 def _create_client_class(self, service_name, service_model): 

193 class_attributes = self._create_methods(service_model) 

194 py_name_to_operation_name = self._create_name_mapping(service_model) 

195 class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name 

196 bases = [BaseClient] 

197 service_id = service_model.service_id.hyphenize() 

198 self._event_emitter.emit( 

199 f'creating-client-class.{service_id}', 

200 class_attributes=class_attributes, 

201 base_classes=bases, 

202 ) 

203 class_name = get_service_module_name(service_model) 

204 cls = type(str(class_name), tuple(bases), class_attributes) 

205 return cls 

206 

207 def _normalize_fips_region(self, region_name, client_config): 

208 if region_name is not None: 

209 normalized_region_name = region_name.replace('fips-', '').replace( 

210 '-fips', '' 

211 ) 

212 # If region has been transformed then set flag 

213 if normalized_region_name != region_name: 

214 config_use_fips_endpoint = Config(use_fips_endpoint=True) 

215 if client_config: 

216 # Keeping endpoint setting client specific 

217 client_config = client_config.merge( 

218 config_use_fips_endpoint 

219 ) 

220 else: 

221 client_config = config_use_fips_endpoint 

222 logger.warning( 

223 'transforming region from %s to %s and setting ' 

224 'use_fips_endpoint to true. client should not ' 

225 'be configured with a fips psuedo region.', 

226 region_name, 

227 normalized_region_name, 

228 ) 

229 region_name = normalized_region_name 

230 return region_name, client_config 

231 

232 def _load_service_model(self, service_name, api_version=None): 

233 json_model = self._loader.load_service_model( 

234 service_name, 'service-2', api_version=api_version 

235 ) 

236 service_model = ServiceModel(json_model, service_name=service_name) 

237 return service_model 

238 

239 def _load_service_endpoints_ruleset(self, service_name, api_version=None): 

240 return self._loader.load_service_model( 

241 service_name, 'endpoint-rule-set-1', api_version=api_version 

242 ) 

243 

244 def _register_retries(self, client): 

245 retry_mode = client.meta.config.retries['mode'] 

246 if retry_mode == 'standard': 

247 self._register_v2_standard_retries(client) 

248 elif retry_mode == 'adaptive': 

249 self._register_v2_standard_retries(client) 

250 self._register_v2_adaptive_retries(client) 

251 elif retry_mode == 'legacy': 

252 self._register_legacy_retries(client) 

253 else: 

254 return 

255 register_feature_id(f'RETRY_MODE_{retry_mode.upper()}') 

256 

257 def _register_v2_standard_retries(self, client): 

258 max_attempts = client.meta.config.retries.get('total_max_attempts') 

259 kwargs = {'client': client} 

260 if max_attempts is not None: 

261 kwargs['max_attempts'] = max_attempts 

262 standard.register_retry_handler(**kwargs) 

263 

264 def _register_v2_adaptive_retries(self, client): 

265 adaptive.register_retry_handler(client) 

266 

267 def _register_legacy_retries(self, client): 

268 endpoint_prefix = client.meta.service_model.endpoint_prefix 

269 service_id = client.meta.service_model.service_id 

270 service_event_name = service_id.hyphenize() 

271 

272 # First, we load the entire retry config for all services, 

273 # then pull out just the information we need. 

274 original_config = self._loader.load_data('_retry') 

275 if not original_config: 

276 return 

277 

278 retries = self._transform_legacy_retries(client.meta.config.retries) 

279 retry_config = self._retry_config_translator.build_retry_config( 

280 endpoint_prefix, 

281 original_config.get('retry', {}), 

282 original_config.get('definitions', {}), 

283 retries, 

284 ) 

285 

286 logger.debug( 

287 "Registering retry handlers for service: %s", 

288 client.meta.service_model.service_name, 

289 ) 

290 handler = self._retry_handler_factory.create_retry_handler( 

291 retry_config, endpoint_prefix 

292 ) 

293 unique_id = f'retry-config-{service_event_name}' 

294 client.meta.events.register( 

295 f"needs-retry.{service_event_name}", handler, unique_id=unique_id 

296 ) 

297 

298 def _transform_legacy_retries(self, retries): 

299 if retries is None: 

300 return 

301 copied_args = retries.copy() 

302 if 'total_max_attempts' in retries: 

303 copied_args = retries.copy() 

304 copied_args['max_attempts'] = ( 

305 copied_args.pop('total_max_attempts') - 1 

306 ) 

307 return copied_args 

308 

309 def _get_retry_mode(self, client, config_store): 

310 client_retries = client.meta.config.retries 

311 if ( 

312 client_retries is not None 

313 and client_retries.get('mode') is not None 

314 ): 

315 return client_retries['mode'] 

316 return config_store.get_config_variable('retry_mode') or 'legacy' 

317 

318 def _register_endpoint_discovery(self, client, endpoint_url, config): 

319 if endpoint_url is not None: 

320 # Don't register any handlers in the case of a custom endpoint url 

321 return 

322 # Only attach handlers if the service supports discovery 

323 if client.meta.service_model.endpoint_discovery_operation is None: 

324 return 

325 events = client.meta.events 

326 service_id = client.meta.service_model.service_id.hyphenize() 

327 enabled = False 

328 if config and config.endpoint_discovery_enabled is not None: 

329 enabled = config.endpoint_discovery_enabled 

330 elif self._config_store: 

331 enabled = self._config_store.get_config_variable( 

332 'endpoint_discovery_enabled' 

333 ) 

334 

335 enabled = self._normalize_endpoint_discovery_config(enabled) 

336 if enabled and self._requires_endpoint_discovery(client, enabled): 

337 discover = enabled is True 

338 manager = EndpointDiscoveryManager( 

339 client, always_discover=discover 

340 ) 

341 handler = EndpointDiscoveryHandler(manager) 

342 handler.register(events, service_id) 

343 else: 

344 events.register( 

345 'before-parameter-build', 

346 block_endpoint_discovery_required_operations, 

347 ) 

348 

349 def _normalize_endpoint_discovery_config(self, enabled): 

350 """Config must either be a boolean-string or string-literal 'auto'""" 

351 if isinstance(enabled, str): 

352 enabled = enabled.lower().strip() 

353 if enabled == 'auto': 

354 return enabled 

355 elif enabled in ('true', 'false'): 

356 return ensure_boolean(enabled) 

357 elif isinstance(enabled, bool): 

358 return enabled 

359 

360 raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled) 

361 

362 def _requires_endpoint_discovery(self, client, enabled): 

363 if enabled == "auto": 

364 return client.meta.service_model.endpoint_discovery_required 

365 return enabled 

366 

367 def _register_eventbridge_events( 

368 self, client, endpoint_bridge, endpoint_url 

369 ): 

370 if client.meta.service_model.service_name != 'events': 

371 return 

372 EventbridgeSignerSetter( 

373 endpoint_resolver=self._endpoint_resolver, 

374 region=client.meta.region_name, 

375 endpoint_url=endpoint_url, 

376 ).register(client.meta.events) 

377 

378 def _register_s3express_events( 

379 self, 

380 client, 

381 endpoint_bridge=None, 

382 endpoint_url=None, 

383 client_config=None, 

384 scoped_config=None, 

385 ): 

386 if client.meta.service_model.service_name != 's3': 

387 return 

388 S3ExpressIdentityResolver(client, RefreshableCredentials).register() 

389 

390 def _register_s3_events( 

391 self, 

392 client, 

393 endpoint_bridge, 

394 endpoint_url, 

395 client_config, 

396 scoped_config, 

397 ): 

398 if client.meta.service_model.service_name != 's3': 

399 return 

400 S3RegionRedirectorv2(None, client).register() 

401 self._set_s3_presign_signature_version( 

402 client.meta, client_config, scoped_config 

403 ) 

404 client.meta.events.register( 

405 'before-parameter-build.s3', self._inject_s3_input_parameters 

406 ) 

407 

408 def _register_s3_control_events( 

409 self, 

410 client, 

411 endpoint_bridge=None, 

412 endpoint_url=None, 

413 client_config=None, 

414 scoped_config=None, 

415 ): 

416 if client.meta.service_model.service_name != 's3control': 

417 return 

418 S3ControlArnParamHandlerv2().register(client.meta.events) 

419 

420 def _set_s3_presign_signature_version( 

421 self, client_meta, client_config, scoped_config 

422 ): 

423 # This will return the manually configured signature version, or None 

424 # if none was manually set. If a customer manually sets the signature 

425 # version, we always want to use what they set. 

426 provided_signature_version = _get_configured_signature_version( 

427 's3', client_config, scoped_config 

428 ) 

429 if provided_signature_version is not None: 

430 return 

431 

432 # Check to see if the region is a region that we know about. If we 

433 # don't know about a region, then we can safely assume it's a new 

434 # region that is sigv4 only, since all new S3 regions only allow sigv4. 

435 # The only exception is aws-global. This is a pseudo-region for the 

436 # global endpoint, we should respect the signature versions it 

437 # supports, which includes v2. 

438 regions = self._endpoint_resolver.get_available_endpoints( 

439 's3', client_meta.partition 

440 ) 

441 if ( 

442 client_meta.region_name != 'aws-global' 

443 and client_meta.region_name not in regions 

444 ): 

445 return 

446 

447 # If it is a region we know about, we want to default to sigv2, so here 

448 # we check to see if it is available. 

449 endpoint = self._endpoint_resolver.construct_endpoint( 

450 's3', client_meta.region_name 

451 ) 

452 signature_versions = endpoint['signatureVersions'] 

453 if 's3' not in signature_versions: 

454 return 

455 

456 # We now know that we're in a known region that supports sigv2 and 

457 # the customer hasn't set a signature version so we default the 

458 # signature version to sigv2. 

459 client_meta.events.register( 

460 'choose-signer.s3', self._default_s3_presign_to_sigv2 

461 ) 

462 

463 def _inject_s3_input_parameters(self, params, context, **kwargs): 

464 context['input_params'] = {} 

465 inject_parameters = ('Bucket', 'Delete', 'Key', 'Prefix') 

466 for inject_parameter in inject_parameters: 

467 if inject_parameter in params: 

468 context['input_params'][inject_parameter] = params[ 

469 inject_parameter 

470 ] 

471 

472 def _default_s3_presign_to_sigv2(self, signature_version, **kwargs): 

473 """ 

474 Returns the 's3' (sigv2) signer if presigning an s3 request. This is 

475 intended to be used to set the default signature version for the signer 

476 to sigv2. Situations where an asymmetric signature is required are the 

477 exception, for example MRAP needs v4a. 

478 

479 :type signature_version: str 

480 :param signature_version: The current client signature version. 

481 

482 :type signing_name: str 

483 :param signing_name: The signing name of the service. 

484 

485 :return: 's3' if the request is an s3 presign request, None otherwise 

486 """ 

487 if signature_version.startswith('v4a'): 

488 return 

489 

490 if signature_version.startswith('v4-s3express'): 

491 return signature_version 

492 

493 for suffix in ['-query', '-presign-post']: 

494 if signature_version.endswith(suffix): 

495 return f's3{suffix}' 

496 

497 def _register_importexport_events( 

498 self, 

499 client, 

500 endpoint_bridge=None, 

501 endpoint_url=None, 

502 client_config=None, 

503 scoped_config=None, 

504 ): 

505 if client.meta.service_model.service_name != 'importexport': 

506 return 

507 self._set_importexport_signature_version( 

508 client.meta, client_config, scoped_config 

509 ) 

510 

511 def _set_importexport_signature_version( 

512 self, client_meta, client_config, scoped_config 

513 ): 

514 # This will return the manually configured signature version, or None 

515 # if none was manually set. If a customer manually sets the signature 

516 # version, we always want to use what they set. 

517 configured_signature_version = _get_configured_signature_version( 

518 'importexport', client_config, scoped_config 

519 ) 

520 if configured_signature_version is not None: 

521 return 

522 

523 # importexport has a modeled signatureVersion of v2, but we 

524 # previously switched to v4 via endpoint.json before endpoint rulesets. 

525 # Override the model's signatureVersion for backwards compatability. 

526 client_meta.events.register( 

527 'choose-signer.importexport', self._default_signer_to_sigv4 

528 ) 

529 

530 def _default_signer_to_sigv4(self, signature_version, **kwargs): 

531 return 'v4' 

532 

533 def _get_client_args( 

534 self, 

535 service_model, 

536 region_name, 

537 is_secure, 

538 endpoint_url, 

539 verify, 

540 credentials, 

541 scoped_config, 

542 client_config, 

543 endpoint_bridge, 

544 auth_token, 

545 endpoints_ruleset_data, 

546 partition_data, 

547 ): 

548 args_creator = ClientArgsCreator( 

549 self._event_emitter, 

550 self._user_agent, 

551 self._response_parser_factory, 

552 self._loader, 

553 self._exceptions_factory, 

554 config_store=self._config_store, 

555 user_agent_creator=self._user_agent_creator, 

556 ) 

557 return args_creator.get_client_args( 

558 service_model, 

559 region_name, 

560 is_secure, 

561 endpoint_url, 

562 verify, 

563 credentials, 

564 scoped_config, 

565 client_config, 

566 endpoint_bridge, 

567 auth_token, 

568 endpoints_ruleset_data, 

569 partition_data, 

570 ) 

571 

572 def _create_methods(self, service_model): 

573 op_dict = {} 

574 for operation_name in service_model.operation_names: 

575 py_operation_name = xform_name(operation_name) 

576 op_dict[py_operation_name] = self._create_api_method( 

577 py_operation_name, operation_name, service_model 

578 ) 

579 return op_dict 

580 

581 def _create_name_mapping(self, service_model): 

582 # py_name -> OperationName, for every operation available 

583 # for a service. 

584 mapping = {} 

585 for operation_name in service_model.operation_names: 

586 py_operation_name = xform_name(operation_name) 

587 mapping[py_operation_name] = operation_name 

588 return mapping 

589 

590 def _create_api_method( 

591 self, py_operation_name, operation_name, service_model 

592 ): 

593 def _api_call(self, *args, **kwargs): 

594 # We're accepting *args so that we can give a more helpful 

595 # error message than TypeError: _api_call takes exactly 

596 # 1 argument. 

597 if args: 

598 raise TypeError( 

599 f"{py_operation_name}() only accepts keyword arguments." 

600 ) 

601 # The "self" in this scope is referring to the BaseClient. 

602 return self._make_api_call(operation_name, kwargs) 

603 

604 _api_call.__name__ = str(py_operation_name) 

605 

606 # Add the docstring to the client method 

607 operation_model = service_model.operation_model(operation_name) 

608 docstring = ClientMethodDocstring( 

609 operation_model=operation_model, 

610 method_name=operation_name, 

611 event_emitter=self._event_emitter, 

612 method_description=operation_model.documentation, 

613 example_prefix=f'response = client.{py_operation_name}', 

614 include_signature=False, 

615 ) 

616 _api_call.__doc__ = docstring 

617 return _api_call 

618 

619 def _evaluate_client_specific_token(self, signing_name): 

620 # Resolves an auth_token for the given signing_name. 

621 # Returns None if no resolver is set or if resolution fails. 

622 resolver = self._auth_token_resolver 

623 if not resolver or not signing_name: 

624 return None 

625 

626 return resolver(signing_name=signing_name) 

627 

628 

629class ClientEndpointBridge: 

630 """Bridges endpoint data and client creation 

631 

632 This class handles taking out the relevant arguments from the endpoint 

633 resolver and determining which values to use, taking into account any 

634 client configuration options and scope configuration options. 

635 

636 This class also handles determining what, if any, region to use if no 

637 explicit region setting is provided. For example, Amazon S3 client will 

638 utilize "us-east-1" by default if no region can be resolved.""" 

639 

640 DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com' 

641 _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control'] 

642 

643 def __init__( 

644 self, 

645 endpoint_resolver, 

646 scoped_config=None, 

647 client_config=None, 

648 default_endpoint=None, 

649 service_signing_name=None, 

650 config_store=None, 

651 service_signature_version=None, 

652 ): 

653 self.service_signing_name = service_signing_name 

654 self.endpoint_resolver = endpoint_resolver 

655 self.scoped_config = scoped_config 

656 self.client_config = client_config 

657 self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT 

658 self.config_store = config_store 

659 self.service_signature_version = service_signature_version 

660 

661 def resolve( 

662 self, service_name, region_name=None, endpoint_url=None, is_secure=True 

663 ): 

664 region_name = self._check_default_region(service_name, region_name) 

665 use_dualstack_endpoint = self._resolve_use_dualstack_endpoint( 

666 service_name 

667 ) 

668 use_fips_endpoint = self._resolve_endpoint_variant_config_var( 

669 'use_fips_endpoint' 

670 ) 

671 resolved = self.endpoint_resolver.construct_endpoint( 

672 service_name, 

673 region_name, 

674 use_dualstack_endpoint=use_dualstack_endpoint, 

675 use_fips_endpoint=use_fips_endpoint, 

676 ) 

677 

678 # If we can't resolve the region, we'll attempt to get a global 

679 # endpoint for non-regionalized services (iam, route53, etc) 

680 if not resolved: 

681 # TODO: fallback partition_name should be configurable in the 

682 # future for users to define as needed. 

683 resolved = self.endpoint_resolver.construct_endpoint( 

684 service_name, 

685 region_name, 

686 partition_name='aws', 

687 use_dualstack_endpoint=use_dualstack_endpoint, 

688 use_fips_endpoint=use_fips_endpoint, 

689 ) 

690 

691 if resolved: 

692 return self._create_endpoint( 

693 resolved, service_name, region_name, endpoint_url, is_secure 

694 ) 

695 else: 

696 return self._assume_endpoint( 

697 service_name, region_name, endpoint_url, is_secure 

698 ) 

699 

700 def resolver_uses_builtin_data(self): 

701 return self.endpoint_resolver.uses_builtin_data 

702 

703 def _check_default_region(self, service_name, region_name): 

704 if region_name is not None: 

705 return region_name 

706 # Use the client_config region if no explicit region was provided. 

707 if self.client_config and self.client_config.region_name is not None: 

708 return self.client_config.region_name 

709 

710 def _create_endpoint( 

711 self, resolved, service_name, region_name, endpoint_url, is_secure 

712 ): 

713 region_name, signing_region = self._pick_region_values( 

714 resolved, region_name, endpoint_url 

715 ) 

716 if endpoint_url is None: 

717 endpoint_url = self._make_url( 

718 resolved.get('hostname'), 

719 is_secure, 

720 resolved.get('protocols', []), 

721 ) 

722 signature_version = self._resolve_signature_version( 

723 service_name, resolved 

724 ) 

725 signing_name = self._resolve_signing_name(service_name, resolved) 

726 return self._create_result( 

727 service_name=service_name, 

728 region_name=region_name, 

729 signing_region=signing_region, 

730 signing_name=signing_name, 

731 endpoint_url=endpoint_url, 

732 metadata=resolved, 

733 signature_version=signature_version, 

734 ) 

735 

736 def _resolve_endpoint_variant_config_var(self, config_var): 

737 client_config = self.client_config 

738 config_val = False 

739 

740 # Client configuration arg has precedence 

741 if client_config and getattr(client_config, config_var) is not None: 

742 return getattr(client_config, config_var) 

743 elif self.config_store is not None: 

744 # Check config store 

745 config_val = self.config_store.get_config_variable(config_var) 

746 return config_val 

747 

748 def _resolve_use_dualstack_endpoint(self, service_name): 

749 s3_dualstack_mode = self._is_s3_dualstack_mode(service_name) 

750 if s3_dualstack_mode is not None: 

751 return s3_dualstack_mode 

752 return self._resolve_endpoint_variant_config_var( 

753 'use_dualstack_endpoint' 

754 ) 

755 

756 def _is_s3_dualstack_mode(self, service_name): 

757 if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES: 

758 return None 

759 # TODO: This normalization logic is duplicated from the 

760 # ClientArgsCreator class. Consolidate everything to 

761 # ClientArgsCreator. _resolve_signature_version also has similarly 

762 # duplicated logic. 

763 client_config = self.client_config 

764 if ( 

765 client_config is not None 

766 and client_config.s3 is not None 

767 and 'use_dualstack_endpoint' in client_config.s3 

768 ): 

769 # Client config trumps scoped config. 

770 return client_config.s3['use_dualstack_endpoint'] 

771 if self.scoped_config is not None: 

772 enabled = self.scoped_config.get('s3', {}).get( 

773 'use_dualstack_endpoint' 

774 ) 

775 if enabled in [True, 'True', 'true']: 

776 return True 

777 

778 def _assume_endpoint( 

779 self, service_name, region_name, endpoint_url, is_secure 

780 ): 

781 if endpoint_url is None: 

782 # Expand the default hostname URI template. 

783 hostname = self.default_endpoint.format( 

784 service=service_name, region=region_name 

785 ) 

786 endpoint_url = self._make_url( 

787 hostname, is_secure, ['http', 'https'] 

788 ) 

789 logger.debug( 

790 'Assuming an endpoint for %s, %s: %s', 

791 service_name, 

792 region_name, 

793 endpoint_url, 

794 ) 

795 # We still want to allow the user to provide an explicit version. 

796 signature_version = self._resolve_signature_version( 

797 service_name, {'signatureVersions': ['v4']} 

798 ) 

799 signing_name = self._resolve_signing_name(service_name, resolved={}) 

800 return self._create_result( 

801 service_name=service_name, 

802 region_name=region_name, 

803 signing_region=region_name, 

804 signing_name=signing_name, 

805 signature_version=signature_version, 

806 endpoint_url=endpoint_url, 

807 metadata={}, 

808 ) 

809 

810 def _create_result( 

811 self, 

812 service_name, 

813 region_name, 

814 signing_region, 

815 signing_name, 

816 endpoint_url, 

817 signature_version, 

818 metadata, 

819 ): 

820 return { 

821 'service_name': service_name, 

822 'region_name': region_name, 

823 'signing_region': signing_region, 

824 'signing_name': signing_name, 

825 'endpoint_url': endpoint_url, 

826 'signature_version': signature_version, 

827 'metadata': metadata, 

828 } 

829 

830 def _make_url(self, hostname, is_secure, supported_protocols): 

831 if is_secure and 'https' in supported_protocols: 

832 scheme = 'https' 

833 else: 

834 scheme = 'http' 

835 return f'{scheme}://{hostname}' 

836 

837 def _resolve_signing_name(self, service_name, resolved): 

838 # CredentialScope overrides everything else. 

839 if ( 

840 'credentialScope' in resolved 

841 and 'service' in resolved['credentialScope'] 

842 ): 

843 return resolved['credentialScope']['service'] 

844 # Use the signingName from the model if present. 

845 if self.service_signing_name: 

846 return self.service_signing_name 

847 # Just assume is the same as the service name. 

848 return service_name 

849 

850 def _pick_region_values(self, resolved, region_name, endpoint_url): 

851 signing_region = region_name 

852 if endpoint_url is None: 

853 # Do not use the region name or signing name from the resolved 

854 # endpoint if the user explicitly provides an endpoint_url. This 

855 # would happen if we resolve to an endpoint where the service has 

856 # a "defaults" section that overrides all endpoint with a single 

857 # hostname and credentialScope. This has been the case historically 

858 # for how STS has worked. The only way to resolve an STS endpoint 

859 # was to provide a region_name and an endpoint_url. In that case, 

860 # we would still resolve an endpoint, but we would not use the 

861 # resolved endpointName or signingRegion because we want to allow 

862 # custom endpoints. 

863 region_name = resolved['endpointName'] 

864 signing_region = region_name 

865 if ( 

866 'credentialScope' in resolved 

867 and 'region' in resolved['credentialScope'] 

868 ): 

869 signing_region = resolved['credentialScope']['region'] 

870 return region_name, signing_region 

871 

872 def _resolve_signature_version(self, service_name, resolved): 

873 configured_version = _get_configured_signature_version( 

874 service_name, self.client_config, self.scoped_config 

875 ) 

876 if configured_version is not None: 

877 return configured_version 

878 

879 # These have since added the "auth" key to the service model 

880 # with "aws.auth#sigv4", but preserve existing behavior from 

881 # when we preferred endpoints.json over the service models 

882 if service_name in ('s3', 's3-control'): 

883 return 's3v4' 

884 

885 if self.service_signature_version is not None: 

886 # Prefer the service model 

887 potential_versions = [self.service_signature_version] 

888 else: 

889 # Fall back to endpoints.json to preserve existing behavior, which 

890 # may be useful for users who have custom service models 

891 potential_versions = resolved.get('signatureVersions', []) 

892 # This was added for the V2 -> V4 transition, 

893 # for services that added V4 after V2 in endpoints.json 

894 if 'v4' in potential_versions: 

895 return 'v4' 

896 # Now just iterate over the signature versions in order until we 

897 # find the first one that is known to Botocore. 

898 for known in potential_versions: 

899 if known in AUTH_TYPE_MAPS: 

900 return known 

901 

902 raise UnknownSignatureVersionError( 

903 signature_version=potential_versions 

904 ) 

905 

906 

907class BaseClient: 

908 # This is actually reassigned with the py->op_name mapping 

909 # when the client creator creates the subclass. This value is used 

910 # because calls such as client.get_paginator('list_objects') use the 

911 # snake_case name, but we need to know the ListObjects form. 

912 # xform_name() does the ListObjects->list_objects conversion, but 

913 # we need the reverse mapping here. 

914 _PY_TO_OP_NAME = {} 

915 

916 def __init__( 

917 self, 

918 serializer, 

919 endpoint, 

920 response_parser, 

921 event_emitter, 

922 request_signer, 

923 service_model, 

924 loader, 

925 client_config, 

926 partition, 

927 exceptions_factory, 

928 endpoint_ruleset_resolver=None, 

929 user_agent_creator=None, 

930 ): 

931 self._serializer = serializer 

932 self._endpoint = endpoint 

933 self._ruleset_resolver = endpoint_ruleset_resolver 

934 self._response_parser = response_parser 

935 self._request_signer = request_signer 

936 self._cache = {} 

937 self._loader = loader 

938 self._client_config = client_config 

939 self.meta = ClientMeta( 

940 event_emitter, 

941 self._client_config, 

942 endpoint.host, 

943 service_model, 

944 self._PY_TO_OP_NAME, 

945 partition, 

946 ) 

947 self._exceptions_factory = exceptions_factory 

948 self._exceptions = None 

949 self._user_agent_creator = user_agent_creator 

950 if self._user_agent_creator is None: 

951 self._user_agent_creator = ( 

952 UserAgentString.from_environment().with_client_config( 

953 self._client_config 

954 ) 

955 ) 

956 self._register_handlers() 

957 

958 def __getattr__(self, item): 

959 service_id = self._service_model.service_id.hyphenize() 

960 event_name = f'getattr.{service_id}.{item}' 

961 

962 handler, event_response = self.meta.events.emit_until_response( 

963 event_name, client=self 

964 ) 

965 

966 if event_response is not None: 

967 return event_response 

968 

969 raise AttributeError( 

970 f"'{self.__class__.__name__}' object has no attribute '{item}'" 

971 ) 

972 

973 def close(self): 

974 """Closes underlying endpoint connections.""" 

975 self._endpoint.close() 

976 

977 def _register_handlers(self): 

978 # Register the handler required to sign requests. 

979 service_id = self.meta.service_model.service_id.hyphenize() 

980 self.meta.events.register( 

981 f"request-created.{service_id}", self._request_signer.handler 

982 ) 

983 # Rebuild user agent string right before request is sent 

984 # to ensure all registered features are included. 

985 self.meta.events.register_last( 

986 f"request-created.{service_id}", 

987 self._user_agent_creator.rebuild_and_replace_user_agent_handler, 

988 ) 

989 

990 @property 

991 def _service_model(self): 

992 return self.meta.service_model 

993 

994 @with_current_context() 

995 def _make_api_call(self, operation_name, api_params): 

996 operation_model = self._service_model.operation_model(operation_name) 

997 service_name = self._service_model.service_name 

998 history_recorder.record( 

999 'API_CALL', 

1000 { 

1001 'service': service_name, 

1002 'operation': operation_name, 

1003 'params': api_params, 

1004 }, 

1005 ) 

1006 if operation_model.deprecated: 

1007 logger.debug( 

1008 'Warning: %s.%s() is deprecated', service_name, operation_name 

1009 ) 

1010 request_context = { 

1011 'client_region': self.meta.region_name, 

1012 'client_config': self.meta.config, 

1013 'has_streaming_input': operation_model.has_streaming_input, 

1014 'auth_type': operation_model.resolved_auth_type, 

1015 'unsigned_payload': operation_model.unsigned_payload, 

1016 'auth_options': self._service_model.metadata.get('auth'), 

1017 } 

1018 

1019 api_params = self._emit_api_params( 

1020 api_params=api_params, 

1021 operation_model=operation_model, 

1022 context=request_context, 

1023 ) 

1024 ( 

1025 endpoint_url, 

1026 additional_headers, 

1027 properties, 

1028 ) = self._resolve_endpoint_ruleset( 

1029 operation_model, api_params, request_context 

1030 ) 

1031 if properties: 

1032 # Pass arbitrary endpoint info with the Request 

1033 # for use during construction. 

1034 request_context['endpoint_properties'] = properties 

1035 request_dict = self._convert_to_request_dict( 

1036 api_params=api_params, 

1037 operation_model=operation_model, 

1038 endpoint_url=endpoint_url, 

1039 context=request_context, 

1040 headers=additional_headers, 

1041 ) 

1042 resolve_checksum_context(request_dict, operation_model, api_params) 

1043 

1044 service_id = self._service_model.service_id.hyphenize() 

1045 handler, event_response = self.meta.events.emit_until_response( 

1046 f'before-call.{service_id}.{operation_name}', 

1047 model=operation_model, 

1048 params=request_dict, 

1049 request_signer=self._request_signer, 

1050 context=request_context, 

1051 ) 

1052 

1053 if event_response is not None: 

1054 http, parsed_response = event_response 

1055 else: 

1056 maybe_compress_request( 

1057 self.meta.config, request_dict, operation_model 

1058 ) 

1059 apply_request_checksum(request_dict) 

1060 http, parsed_response = self._make_request( 

1061 operation_model, request_dict, request_context 

1062 ) 

1063 

1064 self.meta.events.emit( 

1065 f'after-call.{service_id}.{operation_name}', 

1066 http_response=http, 

1067 parsed=parsed_response, 

1068 model=operation_model, 

1069 context=request_context, 

1070 ) 

1071 

1072 if http.status_code >= 300: 

1073 error_info = parsed_response.get("Error", {}) 

1074 error_code = request_context.get( 

1075 'error_code_override' 

1076 ) or error_info.get("Code") 

1077 error_class = self.exceptions.from_code(error_code) 

1078 raise error_class(parsed_response, operation_name) 

1079 else: 

1080 return parsed_response 

1081 

1082 def _make_request(self, operation_model, request_dict, request_context): 

1083 try: 

1084 return self._endpoint.make_request(operation_model, request_dict) 

1085 except Exception as e: 

1086 self.meta.events.emit( 

1087 f'after-call-error.{self._service_model.service_id.hyphenize()}.{operation_model.name}', 

1088 exception=e, 

1089 context=request_context, 

1090 ) 

1091 raise 

1092 

1093 def _convert_to_request_dict( 

1094 self, 

1095 api_params, 

1096 operation_model, 

1097 endpoint_url, 

1098 context=None, 

1099 headers=None, 

1100 set_user_agent_header=True, 

1101 ): 

1102 request_dict = self._serializer.serialize_to_request( 

1103 api_params, operation_model 

1104 ) 

1105 if not self._client_config.inject_host_prefix: 

1106 request_dict.pop('host_prefix', None) 

1107 if headers is not None: 

1108 request_dict['headers'].update(headers) 

1109 if set_user_agent_header: 

1110 user_agent = self._user_agent_creator.to_string() 

1111 else: 

1112 user_agent = None 

1113 prepare_request_dict( 

1114 request_dict, 

1115 endpoint_url=endpoint_url, 

1116 user_agent=user_agent, 

1117 context=context, 

1118 ) 

1119 return request_dict 

1120 

1121 def _emit_api_params(self, api_params, operation_model, context): 

1122 # Given the API params provided by the user and the operation_model 

1123 # we can serialize the request to a request_dict. 

1124 operation_name = operation_model.name 

1125 

1126 # Emit an event that allows users to modify the parameters at the 

1127 # beginning of the method. It allows handlers to modify existing 

1128 # parameters or return a new set of parameters to use. 

1129 service_id = self._service_model.service_id.hyphenize() 

1130 responses = self.meta.events.emit( 

1131 f'provide-client-params.{service_id}.{operation_name}', 

1132 params=api_params, 

1133 model=operation_model, 

1134 context=context, 

1135 ) 

1136 api_params = first_non_none_response(responses, default=api_params) 

1137 

1138 self.meta.events.emit( 

1139 f'before-parameter-build.{service_id}.{operation_name}', 

1140 params=api_params, 

1141 model=operation_model, 

1142 context=context, 

1143 ) 

1144 return api_params 

1145 

1146 def _resolve_endpoint_ruleset( 

1147 self, 

1148 operation_model, 

1149 params, 

1150 request_context, 

1151 ignore_signing_region=False, 

1152 ): 

1153 """Returns endpoint URL and list of additional headers returned from 

1154 EndpointRulesetResolver for the given operation and params. If the 

1155 ruleset resolver is not available, for example because the service has 

1156 no endpoints ruleset file, the legacy endpoint resolver's value is 

1157 returned. 

1158 

1159 Use ignore_signing_region for generating presigned URLs or any other 

1160 situation where the signing region information from the ruleset 

1161 resolver should be ignored. 

1162 

1163 Returns tuple of URL and headers dictionary. Additionally, the 

1164 request_context dict is modified in place with any signing information 

1165 returned from the ruleset resolver. 

1166 """ 

1167 if self._ruleset_resolver is None: 

1168 endpoint_url = self.meta.endpoint_url 

1169 additional_headers = {} 

1170 endpoint_properties = {} 

1171 else: 

1172 endpoint_info = self._ruleset_resolver.construct_endpoint( 

1173 operation_model=operation_model, 

1174 call_args=params, 

1175 request_context=request_context, 

1176 ) 

1177 endpoint_url = endpoint_info.url 

1178 additional_headers = endpoint_info.headers 

1179 endpoint_properties = endpoint_info.properties 

1180 # If authSchemes is present, overwrite default auth type and 

1181 # signing context derived from service model. 

1182 auth_schemes = endpoint_info.properties.get('authSchemes') 

1183 if auth_schemes is not None: 

1184 auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx( 

1185 auth_schemes 

1186 ) 

1187 auth_type, signing_context = auth_info 

1188 request_context['auth_type'] = auth_type 

1189 if 'region' in signing_context and ignore_signing_region: 

1190 del signing_context['region'] 

1191 if 'signing' in request_context: 

1192 request_context['signing'].update(signing_context) 

1193 else: 

1194 request_context['signing'] = signing_context 

1195 

1196 return endpoint_url, additional_headers, endpoint_properties 

1197 

1198 def get_paginator(self, operation_name): 

1199 """Create a paginator for an operation. 

1200 

1201 :type operation_name: string 

1202 :param operation_name: The operation name. This is the same name 

1203 as the method name on the client. For example, if the 

1204 method name is ``create_foo``, and you'd normally invoke the 

1205 operation as ``client.create_foo(**kwargs)``, if the 

1206 ``create_foo`` operation can be paginated, you can use the 

1207 call ``client.get_paginator("create_foo")``. 

1208 

1209 :raise OperationNotPageableError: Raised if the operation is not 

1210 pageable. You can use the ``client.can_paginate`` method to 

1211 check if an operation is pageable. 

1212 

1213 :rtype: ``botocore.paginate.Paginator`` 

1214 :return: A paginator object. 

1215 

1216 """ 

1217 if not self.can_paginate(operation_name): 

1218 raise OperationNotPageableError(operation_name=operation_name) 

1219 else: 

1220 actual_operation_name = self._PY_TO_OP_NAME[operation_name] 

1221 

1222 # Create a new paginate method that will serve as a proxy to 

1223 # the underlying Paginator.paginate method. This is needed to 

1224 # attach a docstring to the method. 

1225 def paginate(self, **kwargs): 

1226 return Paginator.paginate(self, **kwargs) 

1227 

1228 paginator_config = self._cache['page_config'][ 

1229 actual_operation_name 

1230 ] 

1231 # Add the docstring for the paginate method. 

1232 paginate.__doc__ = PaginatorDocstring( 

1233 paginator_name=actual_operation_name, 

1234 event_emitter=self.meta.events, 

1235 service_model=self.meta.service_model, 

1236 paginator_config=paginator_config, 

1237 include_signature=False, 

1238 ) 

1239 

1240 # Rename the paginator class based on the type of paginator. 

1241 service_module_name = get_service_module_name( 

1242 self.meta.service_model 

1243 ) 

1244 paginator_class_name = ( 

1245 f"{service_module_name}.Paginator.{actual_operation_name}" 

1246 ) 

1247 

1248 # Create the new paginator class 

1249 documented_paginator_cls = type( 

1250 paginator_class_name, (Paginator,), {'paginate': paginate} 

1251 ) 

1252 

1253 operation_model = self._service_model.operation_model( 

1254 actual_operation_name 

1255 ) 

1256 paginator = documented_paginator_cls( 

1257 getattr(self, operation_name), 

1258 paginator_config, 

1259 operation_model, 

1260 ) 

1261 return paginator 

1262 

1263 def can_paginate(self, operation_name): 

1264 """Check if an operation can be paginated. 

1265 

1266 :type operation_name: string 

1267 :param operation_name: The operation name. This is the same name 

1268 as the method name on the client. For example, if the 

1269 method name is ``create_foo``, and you'd normally invoke the 

1270 operation as ``client.create_foo(**kwargs)``, if the 

1271 ``create_foo`` operation can be paginated, you can use the 

1272 call ``client.get_paginator("create_foo")``. 

1273 

1274 :return: ``True`` if the operation can be paginated, 

1275 ``False`` otherwise. 

1276 

1277 """ 

1278 if 'page_config' not in self._cache: 

1279 try: 

1280 page_config = self._loader.load_service_model( 

1281 self._service_model.service_name, 

1282 'paginators-1', 

1283 self._service_model.api_version, 

1284 )['pagination'] 

1285 self._cache['page_config'] = page_config 

1286 except DataNotFoundError: 

1287 self._cache['page_config'] = {} 

1288 actual_operation_name = self._PY_TO_OP_NAME[operation_name] 

1289 return actual_operation_name in self._cache['page_config'] 

1290 

1291 def _get_waiter_config(self): 

1292 if 'waiter_config' not in self._cache: 

1293 try: 

1294 waiter_config = self._loader.load_service_model( 

1295 self._service_model.service_name, 

1296 'waiters-2', 

1297 self._service_model.api_version, 

1298 ) 

1299 self._cache['waiter_config'] = waiter_config 

1300 except DataNotFoundError: 

1301 self._cache['waiter_config'] = {} 

1302 return self._cache['waiter_config'] 

1303 

1304 def get_waiter(self, waiter_name): 

1305 """Returns an object that can wait for some condition. 

1306 

1307 :type waiter_name: str 

1308 :param waiter_name: The name of the waiter to get. See the waiters 

1309 section of the service docs for a list of available waiters. 

1310 

1311 :returns: The specified waiter object. 

1312 :rtype: ``botocore.waiter.Waiter`` 

1313 """ 

1314 config = self._get_waiter_config() 

1315 if not config: 

1316 raise ValueError(f"Waiter does not exist: {waiter_name}") 

1317 model = waiter.WaiterModel(config) 

1318 mapping = {} 

1319 for name in model.waiter_names: 

1320 mapping[xform_name(name)] = name 

1321 if waiter_name not in mapping: 

1322 raise ValueError(f"Waiter does not exist: {waiter_name}") 

1323 

1324 return waiter.create_waiter_with_client( 

1325 mapping[waiter_name], model, self 

1326 ) 

1327 

1328 @CachedProperty 

1329 def waiter_names(self): 

1330 """Returns a list of all available waiters.""" 

1331 config = self._get_waiter_config() 

1332 if not config: 

1333 return [] 

1334 model = waiter.WaiterModel(config) 

1335 # Waiter configs is a dict, we just want the waiter names 

1336 # which are the keys in the dict. 

1337 return [xform_name(name) for name in model.waiter_names] 

1338 

1339 @property 

1340 def exceptions(self): 

1341 if self._exceptions is None: 

1342 self._exceptions = self._load_exceptions() 

1343 return self._exceptions 

1344 

1345 def _load_exceptions(self): 

1346 return self._exceptions_factory.create_client_exceptions( 

1347 self._service_model 

1348 ) 

1349 

1350 def _get_credentials(self): 

1351 """ 

1352 This private interface is subject to abrupt breaking changes, including 

1353 removal, in any botocore release. 

1354 """ 

1355 return self._request_signer._credentials 

1356 

1357 

1358class ClientMeta: 

1359 """Holds additional client methods. 

1360 

1361 This class holds additional information for clients. It exists for 

1362 two reasons: 

1363 

1364 * To give advanced functionality to clients 

1365 * To namespace additional client attributes from the operation 

1366 names which are mapped to methods at runtime. This avoids 

1367 ever running into collisions with operation names. 

1368 

1369 """ 

1370 

1371 def __init__( 

1372 self, 

1373 events, 

1374 client_config, 

1375 endpoint_url, 

1376 service_model, 

1377 method_to_api_mapping, 

1378 partition, 

1379 ): 

1380 self.events = events 

1381 self._client_config = client_config 

1382 self._endpoint_url = endpoint_url 

1383 self._service_model = service_model 

1384 self._method_to_api_mapping = method_to_api_mapping 

1385 self._partition = partition 

1386 

1387 @property 

1388 def service_model(self): 

1389 return self._service_model 

1390 

1391 @property 

1392 def region_name(self): 

1393 return self._client_config.region_name 

1394 

1395 @property 

1396 def endpoint_url(self): 

1397 return self._endpoint_url 

1398 

1399 @property 

1400 def config(self): 

1401 return self._client_config 

1402 

1403 @property 

1404 def method_to_api_mapping(self): 

1405 return self._method_to_api_mapping 

1406 

1407 @property 

1408 def partition(self): 

1409 return self._partition 

1410 

1411 

1412def _get_configured_signature_version( 

1413 service_name, client_config, scoped_config 

1414): 

1415 """ 

1416 Gets the manually configured signature version. 

1417 

1418 :returns: the customer configured signature version, or None if no 

1419 signature version was configured. 

1420 """ 

1421 # Client config overrides everything. 

1422 if client_config and client_config.signature_version is not None: 

1423 return client_config.signature_version 

1424 

1425 # Scoped config overrides picking from the endpoint metadata. 

1426 if scoped_config is not None: 

1427 # A given service may have service specific configuration in the 

1428 # config file, so we need to check there as well. 

1429 service_config = scoped_config.get(service_name) 

1430 if service_config is not None and isinstance(service_config, dict): 

1431 version = service_config.get('signature_version') 

1432 if version: 

1433 logger.debug( 

1434 "Switching signature version for service %s " 

1435 "to version %s based on config file override.", 

1436 service_name, 

1437 version, 

1438 ) 

1439 return version 

1440 return None