Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/client.py: 21%

533 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13import logging 

14 

15from botocore import waiter, xform_name 

16from botocore.args import ClientArgsCreator 

17from botocore.auth import AUTH_TYPE_MAPS 

18from botocore.awsrequest import prepare_request_dict 

19from botocore.compress import maybe_compress_request 

20from botocore.config import Config 

21from botocore.credentials import RefreshableCredentials 

22from botocore.discovery import ( 

23 EndpointDiscoveryHandler, 

24 EndpointDiscoveryManager, 

25 block_endpoint_discovery_required_operations, 

26) 

27from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring 

28from botocore.exceptions import ( 

29 DataNotFoundError, 

30 InvalidEndpointDiscoveryConfigurationError, 

31 OperationNotPageableError, 

32 UnknownServiceError, 

33 UnknownSignatureVersionError, 

34) 

35from botocore.history import get_global_history_recorder 

36from botocore.hooks import first_non_none_response 

37from botocore.httpchecksum import ( 

38 apply_request_checksum, 

39 resolve_checksum_context, 

40) 

41from botocore.model import ServiceModel 

42from botocore.paginate import Paginator 

43from botocore.retries import adaptive, standard 

44from botocore.useragent import UserAgentString 

45from botocore.utils import ( 

46 CachedProperty, 

47 EventbridgeSignerSetter, 

48 S3ControlArnParamHandlerv2, 

49 S3ExpressIdentityResolver, 

50 S3RegionRedirectorv2, 

51 ensure_boolean, 

52 get_service_module_name, 

53) 

54 

55# Keep these imported. There's pre-existing code that uses: 

56# "from botocore.client import UNSIGNED" 

57# "from botocore.client import ClientError" 

58# etc. 

59from botocore.exceptions import ClientError # noqa 

60from botocore.utils import S3ArnParamHandler # noqa 

61from botocore.utils import S3ControlArnParamHandler # noqa 

62from botocore.utils import S3ControlEndpointSetter # noqa 

63from botocore.utils import S3EndpointSetter # noqa 

64from botocore.utils import S3RegionRedirector # noqa 

65from botocore import UNSIGNED # noqa 

66 

67 

68_LEGACY_SIGNATURE_VERSIONS = frozenset( 

69 ( 

70 'v2', 

71 'v3', 

72 'v3https', 

73 'v4', 

74 's3', 

75 's3v4', 

76 ) 

77) 

78 

79 

80logger = logging.getLogger(__name__) 

81history_recorder = get_global_history_recorder() 

82 

83 

84class ClientCreator: 

85 """Creates client objects for a service.""" 

86 

87 def __init__( 

88 self, 

89 loader, 

90 endpoint_resolver, 

91 user_agent, 

92 event_emitter, 

93 retry_handler_factory, 

94 retry_config_translator, 

95 response_parser_factory=None, 

96 exceptions_factory=None, 

97 config_store=None, 

98 user_agent_creator=None, 

99 ): 

100 self._loader = loader 

101 self._endpoint_resolver = endpoint_resolver 

102 self._user_agent = user_agent 

103 self._event_emitter = event_emitter 

104 self._retry_handler_factory = retry_handler_factory 

105 self._retry_config_translator = retry_config_translator 

106 self._response_parser_factory = response_parser_factory 

107 self._exceptions_factory = exceptions_factory 

108 # TODO: Migrate things away from scoped_config in favor of the 

109 # config_store. The config store can pull things from both the scoped 

110 # config and environment variables (and potentially more in the 

111 # future). 

112 self._config_store = config_store 

113 self._user_agent_creator = user_agent_creator 

114 

115 def create_client( 

116 self, 

117 service_name, 

118 region_name, 

119 is_secure=True, 

120 endpoint_url=None, 

121 verify=None, 

122 credentials=None, 

123 scoped_config=None, 

124 api_version=None, 

125 client_config=None, 

126 auth_token=None, 

127 ): 

128 responses = self._event_emitter.emit( 

129 'choose-service-name', service_name=service_name 

130 ) 

131 service_name = first_non_none_response(responses, default=service_name) 

132 service_model = self._load_service_model(service_name, api_version) 

133 try: 

134 endpoints_ruleset_data = self._load_service_endpoints_ruleset( 

135 service_name, api_version 

136 ) 

137 partition_data = self._loader.load_data('partitions') 

138 except UnknownServiceError: 

139 endpoints_ruleset_data = None 

140 partition_data = None 

141 logger.info( 

142 'No endpoints ruleset found for service %s, falling back to ' 

143 'legacy endpoint routing.', 

144 service_name, 

145 ) 

146 

147 cls = self._create_client_class(service_name, service_model) 

148 region_name, client_config = self._normalize_fips_region( 

149 region_name, client_config 

150 ) 

151 endpoint_bridge = ClientEndpointBridge( 

152 self._endpoint_resolver, 

153 scoped_config, 

154 client_config, 

155 service_signing_name=service_model.metadata.get('signingName'), 

156 config_store=self._config_store, 

157 service_signature_version=service_model.metadata.get( 

158 'signatureVersion' 

159 ), 

160 ) 

161 client_args = self._get_client_args( 

162 service_model, 

163 region_name, 

164 is_secure, 

165 endpoint_url, 

166 verify, 

167 credentials, 

168 scoped_config, 

169 client_config, 

170 endpoint_bridge, 

171 auth_token, 

172 endpoints_ruleset_data, 

173 partition_data, 

174 ) 

175 service_client = cls(**client_args) 

176 self._register_retries(service_client) 

177 self._register_s3_events( 

178 client=service_client, 

179 endpoint_bridge=None, 

180 endpoint_url=None, 

181 client_config=client_config, 

182 scoped_config=scoped_config, 

183 ) 

184 self._register_s3express_events(client=service_client) 

185 self._register_s3_control_events(client=service_client) 

186 self._register_endpoint_discovery( 

187 service_client, endpoint_url, client_config 

188 ) 

189 return service_client 

190 

191 def create_client_class(self, service_name, api_version=None): 

192 service_model = self._load_service_model(service_name, api_version) 

193 return self._create_client_class(service_name, service_model) 

194 

195 def _create_client_class(self, service_name, service_model): 

196 class_attributes = self._create_methods(service_model) 

197 py_name_to_operation_name = self._create_name_mapping(service_model) 

198 class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name 

199 bases = [BaseClient] 

200 service_id = service_model.service_id.hyphenize() 

201 self._event_emitter.emit( 

202 'creating-client-class.%s' % service_id, 

203 class_attributes=class_attributes, 

204 base_classes=bases, 

205 ) 

206 class_name = get_service_module_name(service_model) 

207 cls = type(str(class_name), tuple(bases), class_attributes) 

208 return cls 

209 

210 def _normalize_fips_region(self, region_name, client_config): 

211 if region_name is not None: 

212 normalized_region_name = region_name.replace('fips-', '').replace( 

213 '-fips', '' 

214 ) 

215 # If region has been transformed then set flag 

216 if normalized_region_name != region_name: 

217 config_use_fips_endpoint = Config(use_fips_endpoint=True) 

218 if client_config: 

219 # Keeping endpoint setting client specific 

220 client_config = client_config.merge( 

221 config_use_fips_endpoint 

222 ) 

223 else: 

224 client_config = config_use_fips_endpoint 

225 logger.warning( 

226 'transforming region from %s to %s and setting ' 

227 'use_fips_endpoint to true. client should not ' 

228 'be configured with a fips psuedo region.' 

229 % (region_name, normalized_region_name) 

230 ) 

231 region_name = normalized_region_name 

232 return region_name, client_config 

233 

234 def _load_service_model(self, service_name, api_version=None): 

235 json_model = self._loader.load_service_model( 

236 service_name, 'service-2', api_version=api_version 

237 ) 

238 service_model = ServiceModel(json_model, service_name=service_name) 

239 return service_model 

240 

241 def _load_service_endpoints_ruleset(self, service_name, api_version=None): 

242 return self._loader.load_service_model( 

243 service_name, 'endpoint-rule-set-1', api_version=api_version 

244 ) 

245 

246 def _register_retries(self, client): 

247 retry_mode = client.meta.config.retries['mode'] 

248 if retry_mode == 'standard': 

249 self._register_v2_standard_retries(client) 

250 elif retry_mode == 'adaptive': 

251 self._register_v2_standard_retries(client) 

252 self._register_v2_adaptive_retries(client) 

253 elif retry_mode == 'legacy': 

254 self._register_legacy_retries(client) 

255 

256 def _register_v2_standard_retries(self, client): 

257 max_attempts = client.meta.config.retries.get('total_max_attempts') 

258 kwargs = {'client': client} 

259 if max_attempts is not None: 

260 kwargs['max_attempts'] = max_attempts 

261 standard.register_retry_handler(**kwargs) 

262 

263 def _register_v2_adaptive_retries(self, client): 

264 adaptive.register_retry_handler(client) 

265 

266 def _register_legacy_retries(self, client): 

267 endpoint_prefix = client.meta.service_model.endpoint_prefix 

268 service_id = client.meta.service_model.service_id 

269 service_event_name = service_id.hyphenize() 

270 

271 # First, we load the entire retry config for all services, 

272 # then pull out just the information we need. 

273 original_config = self._loader.load_data('_retry') 

274 if not original_config: 

275 return 

276 

277 retries = self._transform_legacy_retries(client.meta.config.retries) 

278 retry_config = self._retry_config_translator.build_retry_config( 

279 endpoint_prefix, 

280 original_config.get('retry', {}), 

281 original_config.get('definitions', {}), 

282 retries, 

283 ) 

284 

285 logger.debug( 

286 "Registering retry handlers for service: %s", 

287 client.meta.service_model.service_name, 

288 ) 

289 handler = self._retry_handler_factory.create_retry_handler( 

290 retry_config, endpoint_prefix 

291 ) 

292 unique_id = 'retry-config-%s' % service_event_name 

293 client.meta.events.register( 

294 f"needs-retry.{service_event_name}", handler, unique_id=unique_id 

295 ) 

296 

297 def _transform_legacy_retries(self, retries): 

298 if retries is None: 

299 return 

300 copied_args = retries.copy() 

301 if 'total_max_attempts' in retries: 

302 copied_args = retries.copy() 

303 copied_args['max_attempts'] = ( 

304 copied_args.pop('total_max_attempts') - 1 

305 ) 

306 return copied_args 

307 

308 def _get_retry_mode(self, client, config_store): 

309 client_retries = client.meta.config.retries 

310 if ( 

311 client_retries is not None 

312 and client_retries.get('mode') is not None 

313 ): 

314 return client_retries['mode'] 

315 return config_store.get_config_variable('retry_mode') or 'legacy' 

316 

317 def _register_endpoint_discovery(self, client, endpoint_url, config): 

318 if endpoint_url is not None: 

319 # Don't register any handlers in the case of a custom endpoint url 

320 return 

321 # Only attach handlers if the service supports discovery 

322 if client.meta.service_model.endpoint_discovery_operation is None: 

323 return 

324 events = client.meta.events 

325 service_id = client.meta.service_model.service_id.hyphenize() 

326 enabled = False 

327 if config and config.endpoint_discovery_enabled is not None: 

328 enabled = config.endpoint_discovery_enabled 

329 elif self._config_store: 

330 enabled = self._config_store.get_config_variable( 

331 'endpoint_discovery_enabled' 

332 ) 

333 

334 enabled = self._normalize_endpoint_discovery_config(enabled) 

335 if enabled and self._requires_endpoint_discovery(client, enabled): 

336 discover = enabled is True 

337 manager = EndpointDiscoveryManager( 

338 client, always_discover=discover 

339 ) 

340 handler = EndpointDiscoveryHandler(manager) 

341 handler.register(events, service_id) 

342 else: 

343 events.register( 

344 'before-parameter-build', 

345 block_endpoint_discovery_required_operations, 

346 ) 

347 

348 def _normalize_endpoint_discovery_config(self, enabled): 

349 """Config must either be a boolean-string or string-literal 'auto'""" 

350 if isinstance(enabled, str): 

351 enabled = enabled.lower().strip() 

352 if enabled == 'auto': 

353 return enabled 

354 elif enabled in ('true', 'false'): 

355 return ensure_boolean(enabled) 

356 elif isinstance(enabled, bool): 

357 return enabled 

358 

359 raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled) 

360 

361 def _requires_endpoint_discovery(self, client, enabled): 

362 if enabled == "auto": 

363 return client.meta.service_model.endpoint_discovery_required 

364 return enabled 

365 

366 def _register_eventbridge_events( 

367 self, client, endpoint_bridge, endpoint_url 

368 ): 

369 if client.meta.service_model.service_name != 'events': 

370 return 

371 EventbridgeSignerSetter( 

372 endpoint_resolver=self._endpoint_resolver, 

373 region=client.meta.region_name, 

374 endpoint_url=endpoint_url, 

375 ).register(client.meta.events) 

376 

377 def _register_s3express_events( 

378 self, 

379 client, 

380 endpoint_bridge=None, 

381 endpoint_url=None, 

382 client_config=None, 

383 scoped_config=None, 

384 ): 

385 if client.meta.service_model.service_name != 's3': 

386 return 

387 S3ExpressIdentityResolver(client, RefreshableCredentials).register() 

388 

389 def _register_s3_events( 

390 self, 

391 client, 

392 endpoint_bridge, 

393 endpoint_url, 

394 client_config, 

395 scoped_config, 

396 ): 

397 if client.meta.service_model.service_name != 's3': 

398 return 

399 S3RegionRedirectorv2(None, client).register() 

400 self._set_s3_presign_signature_version( 

401 client.meta, client_config, scoped_config 

402 ) 

403 

404 def _register_s3_control_events( 

405 self, 

406 client, 

407 endpoint_bridge=None, 

408 endpoint_url=None, 

409 client_config=None, 

410 scoped_config=None, 

411 ): 

412 if client.meta.service_model.service_name != 's3control': 

413 return 

414 S3ControlArnParamHandlerv2().register(client.meta.events) 

415 

416 def _set_s3_presign_signature_version( 

417 self, client_meta, client_config, scoped_config 

418 ): 

419 # This will return the manually configured signature version, or None 

420 # if none was manually set. If a customer manually sets the signature 

421 # version, we always want to use what they set. 

422 provided_signature_version = _get_configured_signature_version( 

423 's3', client_config, scoped_config 

424 ) 

425 if provided_signature_version is not None: 

426 return 

427 

428 # Check to see if the region is a region that we know about. If we 

429 # don't know about a region, then we can safely assume it's a new 

430 # region that is sigv4 only, since all new S3 regions only allow sigv4. 

431 # The only exception is aws-global. This is a pseudo-region for the 

432 # global endpoint, we should respect the signature versions it 

433 # supports, which includes v2. 

434 regions = self._endpoint_resolver.get_available_endpoints( 

435 's3', client_meta.partition 

436 ) 

437 if ( 

438 client_meta.region_name != 'aws-global' 

439 and client_meta.region_name not in regions 

440 ): 

441 return 

442 

443 # If it is a region we know about, we want to default to sigv2, so here 

444 # we check to see if it is available. 

445 endpoint = self._endpoint_resolver.construct_endpoint( 

446 's3', client_meta.region_name 

447 ) 

448 signature_versions = endpoint['signatureVersions'] 

449 if 's3' not in signature_versions: 

450 return 

451 

452 # We now know that we're in a known region that supports sigv2 and 

453 # the customer hasn't set a signature version so we default the 

454 # signature version to sigv2. 

455 client_meta.events.register( 

456 'choose-signer.s3', self._default_s3_presign_to_sigv2 

457 ) 

458 

459 def _default_s3_presign_to_sigv2(self, signature_version, **kwargs): 

460 """ 

461 Returns the 's3' (sigv2) signer if presigning an s3 request. This is 

462 intended to be used to set the default signature version for the signer 

463 to sigv2. Situations where an asymmetric signature is required are the 

464 exception, for example MRAP needs v4a. 

465 

466 :type signature_version: str 

467 :param signature_version: The current client signature version. 

468 

469 :type signing_name: str 

470 :param signing_name: The signing name of the service. 

471 

472 :return: 's3' if the request is an s3 presign request, None otherwise 

473 """ 

474 if signature_version.startswith('v4a'): 

475 return 

476 

477 if signature_version.startswith('v4-s3express'): 

478 return f'{signature_version}' 

479 

480 for suffix in ['-query', '-presign-post']: 

481 if signature_version.endswith(suffix): 

482 return f's3{suffix}' 

483 

484 def _get_client_args( 

485 self, 

486 service_model, 

487 region_name, 

488 is_secure, 

489 endpoint_url, 

490 verify, 

491 credentials, 

492 scoped_config, 

493 client_config, 

494 endpoint_bridge, 

495 auth_token, 

496 endpoints_ruleset_data, 

497 partition_data, 

498 ): 

499 args_creator = ClientArgsCreator( 

500 self._event_emitter, 

501 self._user_agent, 

502 self._response_parser_factory, 

503 self._loader, 

504 self._exceptions_factory, 

505 config_store=self._config_store, 

506 user_agent_creator=self._user_agent_creator, 

507 ) 

508 return args_creator.get_client_args( 

509 service_model, 

510 region_name, 

511 is_secure, 

512 endpoint_url, 

513 verify, 

514 credentials, 

515 scoped_config, 

516 client_config, 

517 endpoint_bridge, 

518 auth_token, 

519 endpoints_ruleset_data, 

520 partition_data, 

521 ) 

522 

523 def _create_methods(self, service_model): 

524 op_dict = {} 

525 for operation_name in service_model.operation_names: 

526 py_operation_name = xform_name(operation_name) 

527 op_dict[py_operation_name] = self._create_api_method( 

528 py_operation_name, operation_name, service_model 

529 ) 

530 return op_dict 

531 

532 def _create_name_mapping(self, service_model): 

533 # py_name -> OperationName, for every operation available 

534 # for a service. 

535 mapping = {} 

536 for operation_name in service_model.operation_names: 

537 py_operation_name = xform_name(operation_name) 

538 mapping[py_operation_name] = operation_name 

539 return mapping 

540 

541 def _create_api_method( 

542 self, py_operation_name, operation_name, service_model 

543 ): 

544 def _api_call(self, *args, **kwargs): 

545 # We're accepting *args so that we can give a more helpful 

546 # error message than TypeError: _api_call takes exactly 

547 # 1 argument. 

548 if args: 

549 raise TypeError( 

550 f"{py_operation_name}() only accepts keyword arguments." 

551 ) 

552 # The "self" in this scope is referring to the BaseClient. 

553 return self._make_api_call(operation_name, kwargs) 

554 

555 _api_call.__name__ = str(py_operation_name) 

556 

557 # Add the docstring to the client method 

558 operation_model = service_model.operation_model(operation_name) 

559 docstring = ClientMethodDocstring( 

560 operation_model=operation_model, 

561 method_name=operation_name, 

562 event_emitter=self._event_emitter, 

563 method_description=operation_model.documentation, 

564 example_prefix='response = client.%s' % py_operation_name, 

565 include_signature=False, 

566 ) 

567 _api_call.__doc__ = docstring 

568 return _api_call 

569 

570 

571class ClientEndpointBridge: 

572 """Bridges endpoint data and client creation 

573 

574 This class handles taking out the relevant arguments from the endpoint 

575 resolver and determining which values to use, taking into account any 

576 client configuration options and scope configuration options. 

577 

578 This class also handles determining what, if any, region to use if no 

579 explicit region setting is provided. For example, Amazon S3 client will 

580 utilize "us-east-1" by default if no region can be resolved.""" 

581 

582 DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com' 

583 _DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control'] 

584 

585 def __init__( 

586 self, 

587 endpoint_resolver, 

588 scoped_config=None, 

589 client_config=None, 

590 default_endpoint=None, 

591 service_signing_name=None, 

592 config_store=None, 

593 service_signature_version=None, 

594 ): 

595 self.service_signing_name = service_signing_name 

596 self.endpoint_resolver = endpoint_resolver 

597 self.scoped_config = scoped_config 

598 self.client_config = client_config 

599 self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT 

600 self.config_store = config_store 

601 self.service_signature_version = service_signature_version 

602 

603 def resolve( 

604 self, service_name, region_name=None, endpoint_url=None, is_secure=True 

605 ): 

606 region_name = self._check_default_region(service_name, region_name) 

607 use_dualstack_endpoint = self._resolve_use_dualstack_endpoint( 

608 service_name 

609 ) 

610 use_fips_endpoint = self._resolve_endpoint_variant_config_var( 

611 'use_fips_endpoint' 

612 ) 

613 resolved = self.endpoint_resolver.construct_endpoint( 

614 service_name, 

615 region_name, 

616 use_dualstack_endpoint=use_dualstack_endpoint, 

617 use_fips_endpoint=use_fips_endpoint, 

618 ) 

619 

620 # If we can't resolve the region, we'll attempt to get a global 

621 # endpoint for non-regionalized services (iam, route53, etc) 

622 if not resolved: 

623 # TODO: fallback partition_name should be configurable in the 

624 # future for users to define as needed. 

625 resolved = self.endpoint_resolver.construct_endpoint( 

626 service_name, 

627 region_name, 

628 partition_name='aws', 

629 use_dualstack_endpoint=use_dualstack_endpoint, 

630 use_fips_endpoint=use_fips_endpoint, 

631 ) 

632 

633 if resolved: 

634 return self._create_endpoint( 

635 resolved, service_name, region_name, endpoint_url, is_secure 

636 ) 

637 else: 

638 return self._assume_endpoint( 

639 service_name, region_name, endpoint_url, is_secure 

640 ) 

641 

642 def resolver_uses_builtin_data(self): 

643 return self.endpoint_resolver.uses_builtin_data 

644 

645 def _check_default_region(self, service_name, region_name): 

646 if region_name is not None: 

647 return region_name 

648 # Use the client_config region if no explicit region was provided. 

649 if self.client_config and self.client_config.region_name is not None: 

650 return self.client_config.region_name 

651 

652 def _create_endpoint( 

653 self, resolved, service_name, region_name, endpoint_url, is_secure 

654 ): 

655 region_name, signing_region = self._pick_region_values( 

656 resolved, region_name, endpoint_url 

657 ) 

658 if endpoint_url is None: 

659 endpoint_url = self._make_url( 

660 resolved.get('hostname'), 

661 is_secure, 

662 resolved.get('protocols', []), 

663 ) 

664 signature_version = self._resolve_signature_version( 

665 service_name, resolved 

666 ) 

667 signing_name = self._resolve_signing_name(service_name, resolved) 

668 return self._create_result( 

669 service_name=service_name, 

670 region_name=region_name, 

671 signing_region=signing_region, 

672 signing_name=signing_name, 

673 endpoint_url=endpoint_url, 

674 metadata=resolved, 

675 signature_version=signature_version, 

676 ) 

677 

678 def _resolve_endpoint_variant_config_var(self, config_var): 

679 client_config = self.client_config 

680 config_val = False 

681 

682 # Client configuration arg has precedence 

683 if client_config and getattr(client_config, config_var) is not None: 

684 return getattr(client_config, config_var) 

685 elif self.config_store is not None: 

686 # Check config store 

687 config_val = self.config_store.get_config_variable(config_var) 

688 return config_val 

689 

690 def _resolve_use_dualstack_endpoint(self, service_name): 

691 s3_dualstack_mode = self._is_s3_dualstack_mode(service_name) 

692 if s3_dualstack_mode is not None: 

693 return s3_dualstack_mode 

694 return self._resolve_endpoint_variant_config_var( 

695 'use_dualstack_endpoint' 

696 ) 

697 

698 def _is_s3_dualstack_mode(self, service_name): 

699 if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES: 

700 return None 

701 # TODO: This normalization logic is duplicated from the 

702 # ClientArgsCreator class. Consolidate everything to 

703 # ClientArgsCreator. _resolve_signature_version also has similarly 

704 # duplicated logic. 

705 client_config = self.client_config 

706 if ( 

707 client_config is not None 

708 and client_config.s3 is not None 

709 and 'use_dualstack_endpoint' in client_config.s3 

710 ): 

711 # Client config trumps scoped config. 

712 return client_config.s3['use_dualstack_endpoint'] 

713 if self.scoped_config is not None: 

714 enabled = self.scoped_config.get('s3', {}).get( 

715 'use_dualstack_endpoint' 

716 ) 

717 if enabled in [True, 'True', 'true']: 

718 return True 

719 

720 def _assume_endpoint( 

721 self, service_name, region_name, endpoint_url, is_secure 

722 ): 

723 if endpoint_url is None: 

724 # Expand the default hostname URI template. 

725 hostname = self.default_endpoint.format( 

726 service=service_name, region=region_name 

727 ) 

728 endpoint_url = self._make_url( 

729 hostname, is_secure, ['http', 'https'] 

730 ) 

731 logger.debug( 

732 f'Assuming an endpoint for {service_name}, {region_name}: {endpoint_url}' 

733 ) 

734 # We still want to allow the user to provide an explicit version. 

735 signature_version = self._resolve_signature_version( 

736 service_name, {'signatureVersions': ['v4']} 

737 ) 

738 signing_name = self._resolve_signing_name(service_name, resolved={}) 

739 return self._create_result( 

740 service_name=service_name, 

741 region_name=region_name, 

742 signing_region=region_name, 

743 signing_name=signing_name, 

744 signature_version=signature_version, 

745 endpoint_url=endpoint_url, 

746 metadata={}, 

747 ) 

748 

749 def _create_result( 

750 self, 

751 service_name, 

752 region_name, 

753 signing_region, 

754 signing_name, 

755 endpoint_url, 

756 signature_version, 

757 metadata, 

758 ): 

759 return { 

760 'service_name': service_name, 

761 'region_name': region_name, 

762 'signing_region': signing_region, 

763 'signing_name': signing_name, 

764 'endpoint_url': endpoint_url, 

765 'signature_version': signature_version, 

766 'metadata': metadata, 

767 } 

768 

769 def _make_url(self, hostname, is_secure, supported_protocols): 

770 if is_secure and 'https' in supported_protocols: 

771 scheme = 'https' 

772 else: 

773 scheme = 'http' 

774 return f'{scheme}://{hostname}' 

775 

776 def _resolve_signing_name(self, service_name, resolved): 

777 # CredentialScope overrides everything else. 

778 if ( 

779 'credentialScope' in resolved 

780 and 'service' in resolved['credentialScope'] 

781 ): 

782 return resolved['credentialScope']['service'] 

783 # Use the signingName from the model if present. 

784 if self.service_signing_name: 

785 return self.service_signing_name 

786 # Just assume is the same as the service name. 

787 return service_name 

788 

789 def _pick_region_values(self, resolved, region_name, endpoint_url): 

790 signing_region = region_name 

791 if endpoint_url is None: 

792 # Do not use the region name or signing name from the resolved 

793 # endpoint if the user explicitly provides an endpoint_url. This 

794 # would happen if we resolve to an endpoint where the service has 

795 # a "defaults" section that overrides all endpoint with a single 

796 # hostname and credentialScope. This has been the case historically 

797 # for how STS has worked. The only way to resolve an STS endpoint 

798 # was to provide a region_name and an endpoint_url. In that case, 

799 # we would still resolve an endpoint, but we would not use the 

800 # resolved endpointName or signingRegion because we want to allow 

801 # custom endpoints. 

802 region_name = resolved['endpointName'] 

803 signing_region = region_name 

804 if ( 

805 'credentialScope' in resolved 

806 and 'region' in resolved['credentialScope'] 

807 ): 

808 signing_region = resolved['credentialScope']['region'] 

809 return region_name, signing_region 

810 

811 def _resolve_signature_version(self, service_name, resolved): 

812 configured_version = _get_configured_signature_version( 

813 service_name, self.client_config, self.scoped_config 

814 ) 

815 if configured_version is not None: 

816 return configured_version 

817 

818 potential_versions = resolved.get('signatureVersions', []) 

819 if ( 

820 self.service_signature_version is not None 

821 and self.service_signature_version 

822 not in _LEGACY_SIGNATURE_VERSIONS 

823 ): 

824 # Prefer the service model as most specific 

825 # source of truth for new signature versions. 

826 potential_versions = [self.service_signature_version] 

827 

828 # Pick a signature version from the endpoint metadata if present. 

829 if 'signatureVersions' in resolved: 

830 if service_name == 's3': 

831 return 's3v4' 

832 if 'v4' in potential_versions: 

833 return 'v4' 

834 # Now just iterate over the signature versions in order until we 

835 # find the first one that is known to Botocore. 

836 for known in potential_versions: 

837 if known in AUTH_TYPE_MAPS: 

838 return known 

839 raise UnknownSignatureVersionError( 

840 signature_version=potential_versions 

841 ) 

842 

843 

844class BaseClient: 

845 # This is actually reassigned with the py->op_name mapping 

846 # when the client creator creates the subclass. This value is used 

847 # because calls such as client.get_paginator('list_objects') use the 

848 # snake_case name, but we need to know the ListObjects form. 

849 # xform_name() does the ListObjects->list_objects conversion, but 

850 # we need the reverse mapping here. 

851 _PY_TO_OP_NAME = {} 

852 

853 def __init__( 

854 self, 

855 serializer, 

856 endpoint, 

857 response_parser, 

858 event_emitter, 

859 request_signer, 

860 service_model, 

861 loader, 

862 client_config, 

863 partition, 

864 exceptions_factory, 

865 endpoint_ruleset_resolver=None, 

866 user_agent_creator=None, 

867 ): 

868 self._serializer = serializer 

869 self._endpoint = endpoint 

870 self._ruleset_resolver = endpoint_ruleset_resolver 

871 self._response_parser = response_parser 

872 self._request_signer = request_signer 

873 self._cache = {} 

874 self._loader = loader 

875 self._client_config = client_config 

876 self.meta = ClientMeta( 

877 event_emitter, 

878 self._client_config, 

879 endpoint.host, 

880 service_model, 

881 self._PY_TO_OP_NAME, 

882 partition, 

883 ) 

884 self._exceptions_factory = exceptions_factory 

885 self._exceptions = None 

886 self._user_agent_creator = user_agent_creator 

887 if self._user_agent_creator is None: 

888 self._user_agent_creator = ( 

889 UserAgentString.from_environment().with_client_config( 

890 self._client_config 

891 ) 

892 ) 

893 self._register_handlers() 

894 

895 def __getattr__(self, item): 

896 service_id = self._service_model.service_id.hyphenize() 

897 event_name = f'getattr.{service_id}.{item}' 

898 

899 handler, event_response = self.meta.events.emit_until_response( 

900 event_name, client=self 

901 ) 

902 

903 if event_response is not None: 

904 return event_response 

905 

906 raise AttributeError( 

907 f"'{self.__class__.__name__}' object has no attribute '{item}'" 

908 ) 

909 

910 def close(self): 

911 """Closes underlying endpoint connections.""" 

912 self._endpoint.close() 

913 

914 def _register_handlers(self): 

915 # Register the handler required to sign requests. 

916 service_id = self.meta.service_model.service_id.hyphenize() 

917 self.meta.events.register( 

918 f"request-created.{service_id}", self._request_signer.handler 

919 ) 

920 

921 @property 

922 def _service_model(self): 

923 return self.meta.service_model 

924 

925 def _make_api_call(self, operation_name, api_params): 

926 operation_model = self._service_model.operation_model(operation_name) 

927 service_name = self._service_model.service_name 

928 history_recorder.record( 

929 'API_CALL', 

930 { 

931 'service': service_name, 

932 'operation': operation_name, 

933 'params': api_params, 

934 }, 

935 ) 

936 if operation_model.deprecated: 

937 logger.debug( 

938 'Warning: %s.%s() is deprecated', service_name, operation_name 

939 ) 

940 request_context = { 

941 'client_region': self.meta.region_name, 

942 'client_config': self.meta.config, 

943 'has_streaming_input': operation_model.has_streaming_input, 

944 'auth_type': operation_model.auth_type, 

945 } 

946 api_params = self._emit_api_params( 

947 api_params=api_params, 

948 operation_model=operation_model, 

949 context=request_context, 

950 ) 

951 ( 

952 endpoint_url, 

953 additional_headers, 

954 properties, 

955 ) = self._resolve_endpoint_ruleset( 

956 operation_model, api_params, request_context 

957 ) 

958 if properties: 

959 # Pass arbitrary endpoint info with the Request 

960 # for use during construction. 

961 request_context['endpoint_properties'] = properties 

962 request_dict = self._convert_to_request_dict( 

963 api_params=api_params, 

964 operation_model=operation_model, 

965 endpoint_url=endpoint_url, 

966 context=request_context, 

967 headers=additional_headers, 

968 ) 

969 resolve_checksum_context(request_dict, operation_model, api_params) 

970 

971 service_id = self._service_model.service_id.hyphenize() 

972 handler, event_response = self.meta.events.emit_until_response( 

973 'before-call.{service_id}.{operation_name}'.format( 

974 service_id=service_id, operation_name=operation_name 

975 ), 

976 model=operation_model, 

977 params=request_dict, 

978 request_signer=self._request_signer, 

979 context=request_context, 

980 ) 

981 

982 if event_response is not None: 

983 http, parsed_response = event_response 

984 else: 

985 maybe_compress_request( 

986 self.meta.config, request_dict, operation_model 

987 ) 

988 apply_request_checksum(request_dict) 

989 http, parsed_response = self._make_request( 

990 operation_model, request_dict, request_context 

991 ) 

992 

993 self.meta.events.emit( 

994 'after-call.{service_id}.{operation_name}'.format( 

995 service_id=service_id, operation_name=operation_name 

996 ), 

997 http_response=http, 

998 parsed=parsed_response, 

999 model=operation_model, 

1000 context=request_context, 

1001 ) 

1002 

1003 if http.status_code >= 300: 

1004 error_info = parsed_response.get("Error", {}) 

1005 error_code = error_info.get("QueryErrorCode") or error_info.get( 

1006 "Code" 

1007 ) 

1008 error_class = self.exceptions.from_code(error_code) 

1009 raise error_class(parsed_response, operation_name) 

1010 else: 

1011 return parsed_response 

1012 

1013 def _make_request(self, operation_model, request_dict, request_context): 

1014 try: 

1015 return self._endpoint.make_request(operation_model, request_dict) 

1016 except Exception as e: 

1017 self.meta.events.emit( 

1018 'after-call-error.{service_id}.{operation_name}'.format( 

1019 service_id=self._service_model.service_id.hyphenize(), 

1020 operation_name=operation_model.name, 

1021 ), 

1022 exception=e, 

1023 context=request_context, 

1024 ) 

1025 raise 

1026 

1027 def _convert_to_request_dict( 

1028 self, 

1029 api_params, 

1030 operation_model, 

1031 endpoint_url, 

1032 context=None, 

1033 headers=None, 

1034 set_user_agent_header=True, 

1035 ): 

1036 request_dict = self._serializer.serialize_to_request( 

1037 api_params, operation_model 

1038 ) 

1039 if not self._client_config.inject_host_prefix: 

1040 request_dict.pop('host_prefix', None) 

1041 if headers is not None: 

1042 request_dict['headers'].update(headers) 

1043 if set_user_agent_header: 

1044 user_agent = self._user_agent_creator.to_string() 

1045 else: 

1046 user_agent = None 

1047 prepare_request_dict( 

1048 request_dict, 

1049 endpoint_url=endpoint_url, 

1050 user_agent=user_agent, 

1051 context=context, 

1052 ) 

1053 return request_dict 

1054 

1055 def _emit_api_params(self, api_params, operation_model, context): 

1056 # Given the API params provided by the user and the operation_model 

1057 # we can serialize the request to a request_dict. 

1058 operation_name = operation_model.name 

1059 

1060 # Emit an event that allows users to modify the parameters at the 

1061 # beginning of the method. It allows handlers to modify existing 

1062 # parameters or return a new set of parameters to use. 

1063 service_id = self._service_model.service_id.hyphenize() 

1064 responses = self.meta.events.emit( 

1065 f'provide-client-params.{service_id}.{operation_name}', 

1066 params=api_params, 

1067 model=operation_model, 

1068 context=context, 

1069 ) 

1070 api_params = first_non_none_response(responses, default=api_params) 

1071 

1072 self.meta.events.emit( 

1073 f'before-parameter-build.{service_id}.{operation_name}', 

1074 params=api_params, 

1075 model=operation_model, 

1076 context=context, 

1077 ) 

1078 return api_params 

1079 

1080 def _resolve_endpoint_ruleset( 

1081 self, 

1082 operation_model, 

1083 params, 

1084 request_context, 

1085 ignore_signing_region=False, 

1086 ): 

1087 """Returns endpoint URL and list of additional headers returned from 

1088 EndpointRulesetResolver for the given operation and params. If the 

1089 ruleset resolver is not available, for example because the service has 

1090 no endpoints ruleset file, the legacy endpoint resolver's value is 

1091 returned. 

1092 

1093 Use ignore_signing_region for generating presigned URLs or any other 

1094 situation where the signing region information from the ruleset 

1095 resolver should be ignored. 

1096 

1097 Returns tuple of URL and headers dictionary. Additionally, the 

1098 request_context dict is modified in place with any signing information 

1099 returned from the ruleset resolver. 

1100 """ 

1101 if self._ruleset_resolver is None: 

1102 endpoint_url = self.meta.endpoint_url 

1103 additional_headers = {} 

1104 endpoint_properties = {} 

1105 else: 

1106 endpoint_info = self._ruleset_resolver.construct_endpoint( 

1107 operation_model=operation_model, 

1108 call_args=params, 

1109 request_context=request_context, 

1110 ) 

1111 endpoint_url = endpoint_info.url 

1112 additional_headers = endpoint_info.headers 

1113 endpoint_properties = endpoint_info.properties 

1114 # If authSchemes is present, overwrite default auth type and 

1115 # signing context derived from service model. 

1116 auth_schemes = endpoint_info.properties.get('authSchemes') 

1117 if auth_schemes is not None: 

1118 auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx( 

1119 auth_schemes 

1120 ) 

1121 auth_type, signing_context = auth_info 

1122 request_context['auth_type'] = auth_type 

1123 if 'region' in signing_context and ignore_signing_region: 

1124 del signing_context['region'] 

1125 if 'signing' in request_context: 

1126 request_context['signing'].update(signing_context) 

1127 else: 

1128 request_context['signing'] = signing_context 

1129 

1130 return endpoint_url, additional_headers, endpoint_properties 

1131 

1132 def get_paginator(self, operation_name): 

1133 """Create a paginator for an operation. 

1134 

1135 :type operation_name: string 

1136 :param operation_name: The operation name. This is the same name 

1137 as the method name on the client. For example, if the 

1138 method name is ``create_foo``, and you'd normally invoke the 

1139 operation as ``client.create_foo(**kwargs)``, if the 

1140 ``create_foo`` operation can be paginated, you can use the 

1141 call ``client.get_paginator("create_foo")``. 

1142 

1143 :raise OperationNotPageableError: Raised if the operation is not 

1144 pageable. You can use the ``client.can_paginate`` method to 

1145 check if an operation is pageable. 

1146 

1147 :rtype: L{botocore.paginate.Paginator} 

1148 :return: A paginator object. 

1149 

1150 """ 

1151 if not self.can_paginate(operation_name): 

1152 raise OperationNotPageableError(operation_name=operation_name) 

1153 else: 

1154 actual_operation_name = self._PY_TO_OP_NAME[operation_name] 

1155 

1156 # Create a new paginate method that will serve as a proxy to 

1157 # the underlying Paginator.paginate method. This is needed to 

1158 # attach a docstring to the method. 

1159 def paginate(self, **kwargs): 

1160 return Paginator.paginate(self, **kwargs) 

1161 

1162 paginator_config = self._cache['page_config'][ 

1163 actual_operation_name 

1164 ] 

1165 # Add the docstring for the paginate method. 

1166 paginate.__doc__ = PaginatorDocstring( 

1167 paginator_name=actual_operation_name, 

1168 event_emitter=self.meta.events, 

1169 service_model=self.meta.service_model, 

1170 paginator_config=paginator_config, 

1171 include_signature=False, 

1172 ) 

1173 

1174 # Rename the paginator class based on the type of paginator. 

1175 service_module_name = get_service_module_name( 

1176 self.meta.service_model 

1177 ) 

1178 paginator_class_name = ( 

1179 f"{service_module_name}.Paginator.{actual_operation_name}" 

1180 ) 

1181 

1182 # Create the new paginator class 

1183 documented_paginator_cls = type( 

1184 paginator_class_name, (Paginator,), {'paginate': paginate} 

1185 ) 

1186 

1187 operation_model = self._service_model.operation_model( 

1188 actual_operation_name 

1189 ) 

1190 paginator = documented_paginator_cls( 

1191 getattr(self, operation_name), 

1192 paginator_config, 

1193 operation_model, 

1194 ) 

1195 return paginator 

1196 

1197 def can_paginate(self, operation_name): 

1198 """Check if an operation can be paginated. 

1199 

1200 :type operation_name: string 

1201 :param operation_name: The operation name. This is the same name 

1202 as the method name on the client. For example, if the 

1203 method name is ``create_foo``, and you'd normally invoke the 

1204 operation as ``client.create_foo(**kwargs)``, if the 

1205 ``create_foo`` operation can be paginated, you can use the 

1206 call ``client.get_paginator("create_foo")``. 

1207 

1208 :return: ``True`` if the operation can be paginated, 

1209 ``False`` otherwise. 

1210 

1211 """ 

1212 if 'page_config' not in self._cache: 

1213 try: 

1214 page_config = self._loader.load_service_model( 

1215 self._service_model.service_name, 

1216 'paginators-1', 

1217 self._service_model.api_version, 

1218 )['pagination'] 

1219 self._cache['page_config'] = page_config 

1220 except DataNotFoundError: 

1221 self._cache['page_config'] = {} 

1222 actual_operation_name = self._PY_TO_OP_NAME[operation_name] 

1223 return actual_operation_name in self._cache['page_config'] 

1224 

1225 def _get_waiter_config(self): 

1226 if 'waiter_config' not in self._cache: 

1227 try: 

1228 waiter_config = self._loader.load_service_model( 

1229 self._service_model.service_name, 

1230 'waiters-2', 

1231 self._service_model.api_version, 

1232 ) 

1233 self._cache['waiter_config'] = waiter_config 

1234 except DataNotFoundError: 

1235 self._cache['waiter_config'] = {} 

1236 return self._cache['waiter_config'] 

1237 

1238 def get_waiter(self, waiter_name): 

1239 """Returns an object that can wait for some condition. 

1240 

1241 :type waiter_name: str 

1242 :param waiter_name: The name of the waiter to get. See the waiters 

1243 section of the service docs for a list of available waiters. 

1244 

1245 :returns: The specified waiter object. 

1246 :rtype: botocore.waiter.Waiter 

1247 """ 

1248 config = self._get_waiter_config() 

1249 if not config: 

1250 raise ValueError("Waiter does not exist: %s" % waiter_name) 

1251 model = waiter.WaiterModel(config) 

1252 mapping = {} 

1253 for name in model.waiter_names: 

1254 mapping[xform_name(name)] = name 

1255 if waiter_name not in mapping: 

1256 raise ValueError("Waiter does not exist: %s" % waiter_name) 

1257 

1258 return waiter.create_waiter_with_client( 

1259 mapping[waiter_name], model, self 

1260 ) 

1261 

1262 @CachedProperty 

1263 def waiter_names(self): 

1264 """Returns a list of all available waiters.""" 

1265 config = self._get_waiter_config() 

1266 if not config: 

1267 return [] 

1268 model = waiter.WaiterModel(config) 

1269 # Waiter configs is a dict, we just want the waiter names 

1270 # which are the keys in the dict. 

1271 return [xform_name(name) for name in model.waiter_names] 

1272 

1273 @property 

1274 def exceptions(self): 

1275 if self._exceptions is None: 

1276 self._exceptions = self._load_exceptions() 

1277 return self._exceptions 

1278 

1279 def _load_exceptions(self): 

1280 return self._exceptions_factory.create_client_exceptions( 

1281 self._service_model 

1282 ) 

1283 

1284 def _get_credentials(self): 

1285 """ 

1286 This private interface is subject to abrupt breaking changes, including 

1287 removal, in any botocore release. 

1288 """ 

1289 return self._request_signer._credentials 

1290 

1291 

1292class ClientMeta: 

1293 """Holds additional client methods. 

1294 

1295 This class holds additional information for clients. It exists for 

1296 two reasons: 

1297 

1298 * To give advanced functionality to clients 

1299 * To namespace additional client attributes from the operation 

1300 names which are mapped to methods at runtime. This avoids 

1301 ever running into collisions with operation names. 

1302 

1303 """ 

1304 

1305 def __init__( 

1306 self, 

1307 events, 

1308 client_config, 

1309 endpoint_url, 

1310 service_model, 

1311 method_to_api_mapping, 

1312 partition, 

1313 ): 

1314 self.events = events 

1315 self._client_config = client_config 

1316 self._endpoint_url = endpoint_url 

1317 self._service_model = service_model 

1318 self._method_to_api_mapping = method_to_api_mapping 

1319 self._partition = partition 

1320 

1321 @property 

1322 def service_model(self): 

1323 return self._service_model 

1324 

1325 @property 

1326 def region_name(self): 

1327 return self._client_config.region_name 

1328 

1329 @property 

1330 def endpoint_url(self): 

1331 return self._endpoint_url 

1332 

1333 @property 

1334 def config(self): 

1335 return self._client_config 

1336 

1337 @property 

1338 def method_to_api_mapping(self): 

1339 return self._method_to_api_mapping 

1340 

1341 @property 

1342 def partition(self): 

1343 return self._partition 

1344 

1345 

1346def _get_configured_signature_version( 

1347 service_name, client_config, scoped_config 

1348): 

1349 """ 

1350 Gets the manually configured signature version. 

1351 

1352 :returns: the customer configured signature version, or None if no 

1353 signature version was configured. 

1354 """ 

1355 # Client config overrides everything. 

1356 if client_config and client_config.signature_version is not None: 

1357 return client_config.signature_version 

1358 

1359 # Scoped config overrides picking from the endpoint metadata. 

1360 if scoped_config is not None: 

1361 # A given service may have service specific configuration in the 

1362 # config file, so we need to check there as well. 

1363 service_config = scoped_config.get(service_name) 

1364 if service_config is not None and isinstance(service_config, dict): 

1365 version = service_config.get('signature_version') 

1366 if version: 

1367 logger.debug( 

1368 "Switching signature version for service %s " 

1369 "to version %s based on config file override.", 

1370 service_name, 

1371 version, 

1372 ) 

1373 return version 

1374 return None