Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/botocore/endpoint_provider.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

309 statements  

1# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13 

14""" 

15NOTE: All classes and functions in this module are considered private and are 

16subject to abrupt breaking changes. Please do not use them directly. 

17 

18To view the raw JSON that the objects in this module represent, please 

19go to any `endpoint-rule-set.json` file in /botocore/data/<service>/<api version>/ 

20or you can look at the test files in /tests/unit/data/endpoints/valid-rules/ 

21""" 

22 

23import logging 

24import re 

25from enum import Enum 

26from string import Formatter 

27from typing import NamedTuple 

28 

29from botocore import xform_name 

30from botocore.compat import IPV4_RE, quote, urlparse 

31from botocore.exceptions import EndpointResolutionError 

32from botocore.utils import ( 

33 ArnParser, 

34 InvalidArnException, 

35 is_valid_ipv4_endpoint_url, 

36 is_valid_ipv6_endpoint_url, 

37 lru_cache_weakref, 

38 normalize_url_path, 

39 percent_encode, 

40) 

41 

42logger = logging.getLogger(__name__) 

43 

44TEMPLATE_STRING_RE = re.compile(r"\{[a-zA-Z#]+\}") 

45GET_ATTR_RE = re.compile(r"(\w*)\[(\d+)\]") 

46VALID_HOST_LABEL_RE = re.compile( 

47 r"^(?!-)[a-zA-Z\d-]{1,63}(?<!-)$", 

48) 

49CACHE_SIZE = 100 

50ARN_PARSER = ArnParser() 

51STRING_FORMATTER = Formatter() 

52 

53 

54class RuleSetStandardLibrary: 

55 """Rule actions to be performed by the EndpointProvider.""" 

56 

57 def __init__(self, partitions_data): 

58 self.partitions_data = partitions_data 

59 

60 def is_func(self, argument): 

61 """Determine if an object is a function object. 

62 

63 :type argument: Any 

64 :rtype: bool 

65 """ 

66 return isinstance(argument, dict) and "fn" in argument 

67 

68 def is_ref(self, argument): 

69 """Determine if an object is a reference object. 

70 

71 :type argument: Any 

72 :rtype: bool 

73 """ 

74 return isinstance(argument, dict) and "ref" in argument 

75 

76 def is_template(self, argument): 

77 """Determine if an object contains a template string. 

78 

79 :type argument: Any 

80 :rtpe: bool 

81 """ 

82 return ( 

83 isinstance(argument, str) 

84 and TEMPLATE_STRING_RE.search(argument) is not None 

85 ) 

86 

87 def resolve_template_string(self, value, scope_vars): 

88 """Resolve and inject values into a template string. 

89 

90 :type value: str 

91 :type scope_vars: dict 

92 :rtype: str 

93 """ 

94 result = "" 

95 for literal, reference, _, _ in STRING_FORMATTER.parse(value): 

96 if reference is not None: 

97 template_value = scope_vars 

98 template_params = reference.split("#") 

99 for param in template_params: 

100 template_value = template_value[param] 

101 result += f"{literal}{template_value}" 

102 else: 

103 result += literal 

104 return result 

105 

106 def resolve_value(self, value, scope_vars): 

107 """Return evaluated value based on type. 

108 

109 :type value: Any 

110 :type scope_vars: dict 

111 :rtype: Any 

112 """ 

113 if self.is_func(value): 

114 return self.call_function(value, scope_vars) 

115 elif self.is_ref(value): 

116 return scope_vars.get(value["ref"]) 

117 elif self.is_template(value): 

118 return self.resolve_template_string(value, scope_vars) 

119 

120 return value 

121 

122 def convert_func_name(self, value): 

123 """Normalize function names. 

124 

125 :type value: str 

126 :rtype: str 

127 """ 

128 normalized_name = f"{xform_name(value)}" 

129 if normalized_name == "not": 

130 normalized_name = f"_{normalized_name}" 

131 return normalized_name.replace(".", "_") 

132 

133 def call_function(self, func_signature, scope_vars): 

134 """Call the function with the resolved arguments and assign to `scope_vars` 

135 when applicable. 

136 

137 :type func_signature: dict 

138 :type scope_vars: dict 

139 :rtype: Any 

140 """ 

141 func_args = [ 

142 self.resolve_value(arg, scope_vars) 

143 for arg in func_signature["argv"] 

144 ] 

145 func_name = self.convert_func_name(func_signature["fn"]) 

146 func = getattr(self, func_name) 

147 result = func(*func_args) 

148 if "assign" in func_signature: 

149 assign = func_signature["assign"] 

150 if assign in scope_vars: 

151 raise EndpointResolutionError( 

152 msg=f"Assignment {assign} already exists in " 

153 "scoped variables and cannot be overwritten" 

154 ) 

155 scope_vars[assign] = result 

156 return result 

157 

158 def is_set(self, value): 

159 """Evaluates whether a value is set. 

160 

161 :type value: Any 

162 :rytpe: bool 

163 """ 

164 return value is not None 

165 

166 def get_attr(self, value, path): 

167 """Find an attribute within a value given a path string. The path can contain 

168 the name of the attribute and an index in brackets. A period separating attribute 

169 names indicates the one to the right is nested. The index will always occur at 

170 the end of the path. 

171 

172 :type value: dict or tuple 

173 :type path: str 

174 :rtype: Any 

175 """ 

176 for part in path.split("."): 

177 match = GET_ATTR_RE.search(part) 

178 if match is not None: 

179 name, index = match.groups() 

180 index = int(index) 

181 if name: 

182 value = value.get(name) 

183 if value is None or index >= len(value): 

184 return None 

185 return value[index] 

186 else: 

187 value = value[part] 

188 return value 

189 

190 def format_partition_output(self, partition): 

191 output = partition["outputs"] 

192 output["name"] = partition["id"] 

193 return output 

194 

195 def is_partition_match(self, region, partition): 

196 matches_regex = re.match(partition["regionRegex"], region) is not None 

197 return region in partition["regions"] or matches_regex 

198 

199 def aws_partition(self, value): 

200 """Match a region string to an AWS partition. 

201 

202 :type value: str 

203 :rtype: dict 

204 """ 

205 partitions = self.partitions_data['partitions'] 

206 

207 if value is not None: 

208 for partition in partitions: 

209 if self.is_partition_match(value, partition): 

210 return self.format_partition_output(partition) 

211 

212 # return the default partition if no matches were found 

213 aws_partition = partitions[0] 

214 return self.format_partition_output(aws_partition) 

215 

216 def aws_parse_arn(self, value): 

217 """Parse and validate string for ARN components. 

218 

219 :type value: str 

220 :rtype: dict 

221 """ 

222 if value is None or not value.startswith("arn:"): 

223 return None 

224 

225 try: 

226 arn_dict = ARN_PARSER.parse_arn(value) 

227 except InvalidArnException: 

228 return None 

229 

230 # partition, resource, and service are required 

231 if not all( 

232 (arn_dict["partition"], arn_dict["service"], arn_dict["resource"]) 

233 ): 

234 return None 

235 

236 arn_dict["accountId"] = arn_dict.pop("account") 

237 

238 resource = arn_dict.pop("resource") 

239 arn_dict["resourceId"] = resource.replace(":", "/").split("/") 

240 

241 return arn_dict 

242 

243 def is_valid_host_label(self, value, allow_subdomains): 

244 """Evaluates whether a value is a valid host label per 

245 RFC 1123. If allow_subdomains is True, split on `.` and validate 

246 each component separately. 

247 

248 :type value: str 

249 :type allow_subdomains: bool 

250 :rtype: bool 

251 """ 

252 if value is None or allow_subdomains is False and value.count(".") > 0: 

253 return False 

254 

255 if allow_subdomains is True: 

256 return all( 

257 self.is_valid_host_label(label, False) 

258 for label in value.split(".") 

259 ) 

260 

261 return VALID_HOST_LABEL_RE.match(value) is not None 

262 

263 def string_equals(self, value1, value2): 

264 """Evaluates two string values for equality. 

265 

266 :type value1: str 

267 :type value2: str 

268 :rtype: bool 

269 """ 

270 if not all(isinstance(val, str) for val in (value1, value2)): 

271 msg = f"Both values must be strings, not {type(value1)} and {type(value2)}." 

272 raise EndpointResolutionError(msg=msg) 

273 return value1 == value2 

274 

275 def uri_encode(self, value): 

276 """Perform percent-encoding on an input string. 

277 

278 :type value: str 

279 :rytpe: str 

280 """ 

281 if value is None: 

282 return None 

283 

284 return percent_encode(value) 

285 

286 def parse_url(self, value): 

287 """Parse a URL string into components. 

288 

289 :type value: str 

290 :rtype: dict 

291 """ 

292 if value is None: 

293 return None 

294 

295 url_components = urlparse(value) 

296 try: 

297 # url_parse may assign non-integer values to 

298 # `port` and will fail when accessed. 

299 url_components.port 

300 except ValueError: 

301 return None 

302 

303 scheme = url_components.scheme 

304 query = url_components.query 

305 # URLs with queries are not supported 

306 if scheme not in ("https", "http") or len(query) > 0: 

307 return None 

308 

309 path = url_components.path 

310 normalized_path = quote(normalize_url_path(path)) 

311 if not normalized_path.endswith("/"): 

312 normalized_path = f"{normalized_path}/" 

313 

314 return { 

315 "scheme": scheme, 

316 "authority": url_components.netloc, 

317 "path": path, 

318 "normalizedPath": normalized_path, 

319 "isIp": is_valid_ipv4_endpoint_url(value) 

320 or is_valid_ipv6_endpoint_url(value), 

321 } 

322 

323 def boolean_equals(self, value1, value2): 

324 """Evaluates two boolean values for equality. 

325 

326 :type value1: bool 

327 :type value2: bool 

328 :rtype: bool 

329 """ 

330 if not all(isinstance(val, bool) for val in (value1, value2)): 

331 msg = f"Both arguments must be bools, not {type(value1)} and {type(value2)}." 

332 raise EndpointResolutionError(msg=msg) 

333 return value1 is value2 

334 

335 def is_ascii(self, value): 

336 """Evaluates if a string only contains ASCII characters. 

337 

338 :type value: str 

339 :rtype: bool 

340 """ 

341 try: 

342 value.encode("ascii") 

343 return True 

344 except UnicodeEncodeError: 

345 return False 

346 

347 def substring(self, value, start, stop, reverse): 

348 """Computes a substring given the start index and end index. If `reverse` is 

349 True, slice the string from the end instead. 

350 

351 :type value: str 

352 :type start: int 

353 :type end: int 

354 :type reverse: bool 

355 :rtype: str 

356 """ 

357 if not isinstance(value, str): 

358 msg = f"Input must be a string, not {type(value)}." 

359 raise EndpointResolutionError(msg=msg) 

360 if start >= stop or len(value) < stop or not self.is_ascii(value): 

361 return None 

362 

363 if reverse is True: 

364 r_start = len(value) - stop 

365 r_stop = len(value) - start 

366 return value[r_start:r_stop] 

367 

368 return value[start:stop] 

369 

370 def _not(self, value): 

371 """A function implementation of the logical operator `not`. 

372 

373 :type value: Any 

374 :rtype: bool 

375 """ 

376 return not value 

377 

378 def aws_is_virtual_hostable_s3_bucket(self, value, allow_subdomains): 

379 """Evaluates whether a value is a valid bucket name for virtual host 

380 style bucket URLs. To pass, the value must meet the following criteria: 

381 1. is_valid_host_label(value) is True 

382 2. length between 3 and 63 characters (inclusive) 

383 3. does not contain uppercase characters 

384 4. is not formatted as an IP address 

385 

386 If allow_subdomains is True, split on `.` and validate 

387 each component separately. 

388 

389 :type value: str 

390 :type allow_subdomains: bool 

391 :rtype: bool 

392 """ 

393 if ( 

394 value is None 

395 or len(value) < 3 

396 or value.lower() != value 

397 or IPV4_RE.match(value) is not None 

398 ): 

399 return False 

400 

401 return self.is_valid_host_label( 

402 value, allow_subdomains=allow_subdomains 

403 ) 

404 

405 

406# maintains backwards compatibility as `Library` was misspelled 

407# in earlier versions 

408RuleSetStandardLibary = RuleSetStandardLibrary 

409 

410 

411class BaseRule: 

412 """Base interface for individual endpoint rules.""" 

413 

414 def __init__(self, conditions, documentation=None): 

415 self.conditions = conditions 

416 self.documentation = documentation 

417 

418 def evaluate(self, scope_vars, rule_lib): 

419 raise NotImplementedError() 

420 

421 def evaluate_conditions(self, scope_vars, rule_lib): 

422 """Determine if all conditions in a rule are met. 

423 

424 :type scope_vars: dict 

425 :type rule_lib: RuleSetStandardLibrary 

426 :rtype: bool 

427 """ 

428 for func_signature in self.conditions: 

429 result = rule_lib.call_function(func_signature, scope_vars) 

430 if result is False or result is None: 

431 return False 

432 return True 

433 

434 

435class RuleSetEndpoint(NamedTuple): 

436 """A resolved endpoint object returned by a rule.""" 

437 

438 url: str 

439 properties: dict 

440 headers: dict 

441 

442 

443class EndpointRule(BaseRule): 

444 def __init__(self, endpoint, **kwargs): 

445 super().__init__(**kwargs) 

446 self.endpoint = endpoint 

447 

448 def evaluate(self, scope_vars, rule_lib): 

449 """Determine if conditions are met to provide a valid endpoint. 

450 

451 :type scope_vars: dict 

452 :rtype: RuleSetEndpoint 

453 """ 

454 if self.evaluate_conditions(scope_vars, rule_lib): 

455 url = rule_lib.resolve_value(self.endpoint["url"], scope_vars) 

456 properties = self.resolve_properties( 

457 self.endpoint.get("properties", {}), 

458 scope_vars, 

459 rule_lib, 

460 ) 

461 headers = self.resolve_headers(scope_vars, rule_lib) 

462 return RuleSetEndpoint( 

463 url=url, properties=properties, headers=headers 

464 ) 

465 

466 return None 

467 

468 def resolve_properties(self, properties, scope_vars, rule_lib): 

469 """Traverse `properties` attribute, resolving any template strings. 

470 

471 :type properties: dict/list/str 

472 :type scope_vars: dict 

473 :type rule_lib: RuleSetStandardLibrary 

474 :rtype: dict 

475 """ 

476 if isinstance(properties, list): 

477 return [ 

478 self.resolve_properties(prop, scope_vars, rule_lib) 

479 for prop in properties 

480 ] 

481 elif isinstance(properties, dict): 

482 return { 

483 key: self.resolve_properties(value, scope_vars, rule_lib) 

484 for key, value in properties.items() 

485 } 

486 elif rule_lib.is_template(properties): 

487 return rule_lib.resolve_template_string(properties, scope_vars) 

488 

489 return properties 

490 

491 def resolve_headers(self, scope_vars, rule_lib): 

492 """Iterate through headers attribute resolving all values. 

493 

494 :type scope_vars: dict 

495 :type rule_lib: RuleSetStandardLibrary 

496 :rtype: dict 

497 """ 

498 resolved_headers = {} 

499 headers = self.endpoint.get("headers", {}) 

500 

501 for header, values in headers.items(): 

502 resolved_headers[header] = [ 

503 rule_lib.resolve_value(item, scope_vars) for item in values 

504 ] 

505 return resolved_headers 

506 

507 

508class ErrorRule(BaseRule): 

509 def __init__(self, error, **kwargs): 

510 super().__init__(**kwargs) 

511 self.error = error 

512 

513 def evaluate(self, scope_vars, rule_lib): 

514 """If an error rule's conditions are met, raise an error rule. 

515 

516 :type scope_vars: dict 

517 :type rule_lib: RuleSetStandardLibrary 

518 :rtype: EndpointResolutionError 

519 """ 

520 if self.evaluate_conditions(scope_vars, rule_lib): 

521 error = rule_lib.resolve_value(self.error, scope_vars) 

522 raise EndpointResolutionError(msg=error) 

523 return None 

524 

525 

526class TreeRule(BaseRule): 

527 """A tree rule is non-terminal meaning it will never be returned to a provider. 

528 Additionally this means it has no attributes that need to be resolved. 

529 """ 

530 

531 def __init__(self, rules, **kwargs): 

532 super().__init__(**kwargs) 

533 self.rules = [RuleCreator.create(**rule) for rule in rules] 

534 

535 def evaluate(self, scope_vars, rule_lib): 

536 """If a tree rule's conditions are met, iterate its sub-rules 

537 and return first result found. 

538 

539 :type scope_vars: dict 

540 :type rule_lib: RuleSetStandardLibrary 

541 :rtype: RuleSetEndpoint/EndpointResolutionError 

542 """ 

543 if self.evaluate_conditions(scope_vars, rule_lib): 

544 for rule in self.rules: 

545 # don't share scope_vars between rules 

546 rule_result = rule.evaluate(scope_vars.copy(), rule_lib) 

547 if rule_result: 

548 return rule_result 

549 return None 

550 

551 

552class RuleCreator: 

553 endpoint = EndpointRule 

554 error = ErrorRule 

555 tree = TreeRule 

556 

557 @classmethod 

558 def create(cls, **kwargs): 

559 """Create a rule instance from metadata. 

560 

561 :rtype: TreeRule/EndpointRule/ErrorRule 

562 """ 

563 rule_type = kwargs.pop("type") 

564 try: 

565 rule_class = getattr(cls, rule_type) 

566 except AttributeError: 

567 raise EndpointResolutionError( 

568 msg=f"Unknown rule type: {rule_type}. A rule must " 

569 "be of type tree, endpoint or error." 

570 ) 

571 else: 

572 return rule_class(**kwargs) 

573 

574 

575class ParameterType(Enum): 

576 """Translation from `type` attribute to native Python type.""" 

577 

578 string = str 

579 boolean = bool 

580 stringarray = tuple 

581 

582 

583class ParameterDefinition: 

584 """The spec of an individual parameter defined in a RuleSet.""" 

585 

586 def __init__( 

587 self, 

588 name, 

589 parameter_type, 

590 documentation=None, 

591 builtIn=None, 

592 default=None, 

593 required=None, 

594 deprecated=None, 

595 ): 

596 self.name = name 

597 try: 

598 self.parameter_type = getattr( 

599 ParameterType, parameter_type.lower() 

600 ).value 

601 except AttributeError: 

602 raise EndpointResolutionError( 

603 msg=f"Unknown parameter type: {parameter_type}. " 

604 "A parameter must be of type string, boolean, or stringarray." 

605 ) 

606 self.documentation = documentation 

607 self.builtin = builtIn 

608 self.default = default 

609 self.required = required 

610 self.deprecated = deprecated 

611 

612 def validate_input(self, value): 

613 """Perform base validation on parameter input. 

614 

615 :type value: Any 

616 :raises: EndpointParametersError 

617 """ 

618 

619 if not isinstance(value, self.parameter_type): 

620 raise EndpointResolutionError( 

621 msg=f"Value ({self.name}) is the wrong " 

622 f"type. Must be {self.parameter_type}." 

623 ) 

624 if self.deprecated is not None: 

625 depr_str = f"{self.name} has been deprecated." 

626 msg = self.deprecated.get("message") 

627 since = self.deprecated.get("since") 

628 if msg: 

629 depr_str += f"\n{msg}" 

630 if since: 

631 depr_str += f"\nDeprecated since {since}." 

632 logger.info(depr_str) 

633 

634 return None 

635 

636 def process_input(self, value): 

637 """Process input against spec, applying default if value is None.""" 

638 if value is None: 

639 if self.default is not None: 

640 return self.default 

641 if self.required: 

642 raise EndpointResolutionError( 

643 msg=f"Cannot find value for required parameter {self.name}" 

644 ) 

645 # in all other cases, the parameter will keep the value None 

646 else: 

647 self.validate_input(value) 

648 return value 

649 

650 

651class RuleSet: 

652 """Collection of rules to derive a routable service endpoint.""" 

653 

654 def __init__( 

655 self, version, parameters, rules, partitions, documentation=None 

656 ): 

657 self.version = version 

658 self.parameters = self._ingest_parameter_spec(parameters) 

659 self.rules = [RuleCreator.create(**rule) for rule in rules] 

660 self.rule_lib = RuleSetStandardLibrary(partitions) 

661 self.documentation = documentation 

662 

663 def _ingest_parameter_spec(self, parameters): 

664 return { 

665 name: ParameterDefinition( 

666 name, 

667 spec["type"], 

668 spec.get("documentation"), 

669 spec.get("builtIn"), 

670 spec.get("default"), 

671 spec.get("required"), 

672 spec.get("deprecated"), 

673 ) 

674 for name, spec in parameters.items() 

675 } 

676 

677 def process_input_parameters(self, input_params): 

678 """Process each input parameter against its spec. 

679 

680 :type input_params: dict 

681 """ 

682 for name, spec in self.parameters.items(): 

683 value = spec.process_input(input_params.get(name)) 

684 if value is not None: 

685 input_params[name] = value 

686 return None 

687 

688 def evaluate(self, input_parameters): 

689 """Evaluate input parameters against rules returning first match. 

690 

691 :type input_parameters: dict 

692 """ 

693 self.process_input_parameters(input_parameters) 

694 for rule in self.rules: 

695 evaluation = rule.evaluate(input_parameters.copy(), self.rule_lib) 

696 if evaluation is not None: 

697 return evaluation 

698 return None 

699 

700 

701class EndpointProvider: 

702 """Derives endpoints from a RuleSet for given input parameters.""" 

703 

704 def __init__(self, ruleset_data, partition_data): 

705 self.ruleset = RuleSet(**ruleset_data, partitions=partition_data) 

706 

707 @lru_cache_weakref(maxsize=CACHE_SIZE) 

708 def resolve_endpoint(self, **input_parameters): 

709 """Match input parameters to a rule. 

710 

711 :type input_parameters: dict 

712 :rtype: RuleSetEndpoint 

713 """ 

714 params_for_error = input_parameters.copy() 

715 endpoint = self.ruleset.evaluate(input_parameters) 

716 if endpoint is None: 

717 param_string = "\n".join( 

718 [f"{key}: {value}" for key, value in params_for_error.items()] 

719 ) 

720 raise EndpointResolutionError( 

721 msg=f"No endpoint found for parameters:\n{param_string}" 

722 ) 

723 return endpoint