Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/endpoint_provider.py: 27%

306 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13 

14""" 

15NOTE: All classes and functions in this module are considered private and are 

16subject to abrupt breaking changes. Please do not use them directly. 

17 

18To view the raw JSON that the objects in this module represent, please 

19go to any `endpoint-rule-set.json` file in /botocore/data/<service>/<api version>/ 

20or you can look at the test files in /tests/unit/data/endpoints/valid-rules/ 

21""" 

22 

23 

24import logging 

25import re 

26from enum import Enum 

27from string import Formatter 

28from typing import NamedTuple 

29 

30from botocore import xform_name 

31from botocore.compat import IPV4_RE, quote, urlparse 

32from botocore.exceptions import EndpointResolutionError 

33from botocore.utils import ( 

34 ArnParser, 

35 InvalidArnException, 

36 is_valid_ipv4_endpoint_url, 

37 is_valid_ipv6_endpoint_url, 

38 lru_cache_weakref, 

39 normalize_url_path, 

40 percent_encode, 

41) 

42 

43logger = logging.getLogger(__name__) 

44 

45TEMPLATE_STRING_RE = re.compile(r"\{[a-zA-Z#]+\}") 

46GET_ATTR_RE = re.compile(r"(\w+)\[(\d+)\]") 

47VALID_HOST_LABEL_RE = re.compile( 

48 r"^(?!-)[a-zA-Z\d-]{1,63}(?<!-)$", 

49) 

50CACHE_SIZE = 100 

51ARN_PARSER = ArnParser() 

52STRING_FORMATTER = Formatter() 

53 

54 

55class RuleSetStandardLibrary: 

56 """Rule actions to be performed by the EndpointProvider.""" 

57 

58 def __init__(self, partitions_data): 

59 self.partitions_data = partitions_data 

60 

61 def is_func(self, argument): 

62 """Determine if an object is a function object. 

63 

64 :type argument: Any 

65 :rtype: bool 

66 """ 

67 return isinstance(argument, dict) and "fn" in argument 

68 

69 def is_ref(self, argument): 

70 """Determine if an object is a reference object. 

71 

72 :type argument: Any 

73 :rtype: bool 

74 """ 

75 return isinstance(argument, dict) and "ref" in argument 

76 

77 def is_template(self, argument): 

78 """Determine if an object contains a template string. 

79 

80 :type argument: Any 

81 :rtpe: bool 

82 """ 

83 return ( 

84 isinstance(argument, str) 

85 and TEMPLATE_STRING_RE.search(argument) is not None 

86 ) 

87 

88 def resolve_template_string(self, value, scope_vars): 

89 """Resolve and inject values into a template string. 

90 

91 :type value: str 

92 :type scope_vars: dict 

93 :rtype: str 

94 """ 

95 result = "" 

96 for literal, reference, _, _ in STRING_FORMATTER.parse(value): 

97 if reference is not None: 

98 template_value = scope_vars 

99 template_params = reference.split("#") 

100 for param in template_params: 

101 template_value = template_value[param] 

102 result += f"{literal}{template_value}" 

103 else: 

104 result += literal 

105 return result 

106 

107 def resolve_value(self, value, scope_vars): 

108 """Return evaluated value based on type. 

109 

110 :type value: Any 

111 :type scope_vars: dict 

112 :rtype: Any 

113 """ 

114 if self.is_func(value): 

115 return self.call_function(value, scope_vars) 

116 elif self.is_ref(value): 

117 return scope_vars.get(value["ref"]) 

118 elif self.is_template(value): 

119 return self.resolve_template_string(value, scope_vars) 

120 

121 return value 

122 

123 def convert_func_name(self, value): 

124 """Normalize function names. 

125 

126 :type value: str 

127 :rtype: str 

128 """ 

129 normalized_name = f"{xform_name(value)}" 

130 if normalized_name == "not": 

131 normalized_name = f"_{normalized_name}" 

132 return normalized_name.replace(".", "_") 

133 

134 def call_function(self, func_signature, scope_vars): 

135 """Call the function with the resolved arguments and assign to `scope_vars` 

136 when applicable. 

137 

138 :type func_signature: dict 

139 :type scope_vars: dict 

140 :rtype: Any 

141 """ 

142 func_args = [ 

143 self.resolve_value(arg, scope_vars) 

144 for arg in func_signature["argv"] 

145 ] 

146 func_name = self.convert_func_name(func_signature["fn"]) 

147 func = getattr(self, func_name) 

148 result = func(*func_args) 

149 if "assign" in func_signature: 

150 assign = func_signature["assign"] 

151 if assign in scope_vars: 

152 raise EndpointResolutionError( 

153 msg=f"Assignment {assign} already exists in " 

154 "scoped variables and cannot be overwritten" 

155 ) 

156 scope_vars[assign] = result 

157 return result 

158 

159 def is_set(self, value): 

160 """Evaluates whether a value is set. 

161 

162 :type value: Any 

163 :rytpe: bool 

164 """ 

165 return value is not None 

166 

167 def get_attr(self, value, path): 

168 """Find an attribute within a value given a path string. The path can contain 

169 the name of the attribute and an index in brackets. A period separating attribute 

170 names indicates the one to the right is nested. The index will always occur at 

171 the end of the path. 

172 

173 :type value: dict or list 

174 :type path: str 

175 :rtype: Any 

176 """ 

177 for part in path.split("."): 

178 match = GET_ATTR_RE.search(part) 

179 if match is not None: 

180 name, index = match.groups() 

181 index = int(index) 

182 value = value.get(name) 

183 if value is None or index >= len(value): 

184 return None 

185 return value[index] 

186 else: 

187 value = value[part] 

188 return value 

189 

190 def format_partition_output(self, partition): 

191 output = partition["outputs"] 

192 output["name"] = partition["id"] 

193 return output 

194 

195 def is_partition_match(self, region, partition): 

196 matches_regex = re.match(partition["regionRegex"], region) is not None 

197 return region in partition["regions"] or matches_regex 

198 

199 def aws_partition(self, value): 

200 """Match a region string to an AWS partition. 

201 

202 :type value: str 

203 :rtype: dict 

204 """ 

205 partitions = self.partitions_data['partitions'] 

206 

207 if value is not None: 

208 for partition in partitions: 

209 if self.is_partition_match(value, partition): 

210 return self.format_partition_output(partition) 

211 

212 # return the default partition if no matches were found 

213 aws_partition = partitions[0] 

214 return self.format_partition_output(aws_partition) 

215 

216 def aws_parse_arn(self, value): 

217 """Parse and validate string for ARN components. 

218 

219 :type value: str 

220 :rtype: dict 

221 """ 

222 if value is None or not value.startswith("arn:"): 

223 return None 

224 

225 try: 

226 arn_dict = ARN_PARSER.parse_arn(value) 

227 except InvalidArnException: 

228 return None 

229 

230 # partition, resource, and service are required 

231 if not all( 

232 (arn_dict["partition"], arn_dict["service"], arn_dict["resource"]) 

233 ): 

234 return None 

235 

236 arn_dict["accountId"] = arn_dict.pop("account") 

237 

238 resource = arn_dict.pop("resource") 

239 arn_dict["resourceId"] = resource.replace(":", "/").split("/") 

240 

241 return arn_dict 

242 

243 def is_valid_host_label(self, value, allow_subdomains): 

244 """Evaluates whether a value is a valid host label per 

245 RFC 1123. If allow_subdomains is True, split on `.` and validate 

246 each component separately. 

247 

248 :type value: str 

249 :type allow_subdomains: bool 

250 :rtype: bool 

251 """ 

252 if value is None or allow_subdomains is False and value.count(".") > 0: 

253 return False 

254 

255 if allow_subdomains is True: 

256 return all( 

257 self.is_valid_host_label(label, False) 

258 for label in value.split(".") 

259 ) 

260 

261 return VALID_HOST_LABEL_RE.match(value) is not None 

262 

263 def string_equals(self, value1, value2): 

264 """Evaluates two string values for equality. 

265 

266 :type value1: str 

267 :type value2: str 

268 :rtype: bool 

269 """ 

270 if not all(isinstance(val, str) for val in (value1, value2)): 

271 msg = f"Both values must be strings, not {type(value1)} and {type(value2)}." 

272 raise EndpointResolutionError(msg=msg) 

273 return value1 == value2 

274 

275 def uri_encode(self, value): 

276 """Perform percent-encoding on an input string. 

277 

278 :type value: str 

279 :rytpe: str 

280 """ 

281 if value is None: 

282 return None 

283 

284 return percent_encode(value) 

285 

286 def parse_url(self, value): 

287 """Parse a URL string into components. 

288 

289 :type value: str 

290 :rtype: dict 

291 """ 

292 if value is None: 

293 return None 

294 

295 url_components = urlparse(value) 

296 try: 

297 # url_parse may assign non-integer values to 

298 # `port` and will fail when accessed. 

299 url_components.port 

300 except ValueError: 

301 return None 

302 

303 scheme = url_components.scheme 

304 query = url_components.query 

305 # URLs with queries are not supported 

306 if scheme not in ("https", "http") or len(query) > 0: 

307 return None 

308 

309 path = url_components.path 

310 normalized_path = quote(normalize_url_path(path)) 

311 if not normalized_path.endswith("/"): 

312 normalized_path = f"{normalized_path}/" 

313 

314 return { 

315 "scheme": scheme, 

316 "authority": url_components.netloc, 

317 "path": path, 

318 "normalizedPath": normalized_path, 

319 "isIp": is_valid_ipv4_endpoint_url(value) 

320 or is_valid_ipv6_endpoint_url(value), 

321 } 

322 

323 def boolean_equals(self, value1, value2): 

324 """Evaluates two boolean values for equality. 

325 

326 :type value1: bool 

327 :type value2: bool 

328 :rtype: bool 

329 """ 

330 if not all(isinstance(val, bool) for val in (value1, value2)): 

331 msg = f"Both arguments must be bools, not {type(value1)} and {type(value2)}." 

332 raise EndpointResolutionError(msg=msg) 

333 return value1 is value2 

334 

335 def is_ascii(self, value): 

336 """Evaluates if a string only contains ASCII characters. 

337 

338 :type value: str 

339 :rtype: bool 

340 """ 

341 try: 

342 value.encode("ascii") 

343 return True 

344 except UnicodeEncodeError: 

345 return False 

346 

347 def substring(self, value, start, stop, reverse): 

348 """Computes a substring given the start index and end index. If `reverse` is 

349 True, slice the string from the end instead. 

350 

351 :type value: str 

352 :type start: int 

353 :type end: int 

354 :type reverse: bool 

355 :rtype: str 

356 """ 

357 if not isinstance(value, str): 

358 msg = f"Input must be a string, not {type(value)}." 

359 raise EndpointResolutionError(msg=msg) 

360 if start >= stop or len(value) < stop or not self.is_ascii(value): 

361 return None 

362 

363 if reverse is True: 

364 r_start = len(value) - stop 

365 r_stop = len(value) - start 

366 return value[r_start:r_stop] 

367 

368 return value[start:stop] 

369 

370 def _not(self, value): 

371 """A function implementation of the logical operator `not`. 

372 

373 :type value: Any 

374 :rtype: bool 

375 """ 

376 return not value 

377 

378 def aws_is_virtual_hostable_s3_bucket(self, value, allow_subdomains): 

379 """Evaluates whether a value is a valid bucket name for virtual host 

380 style bucket URLs. To pass, the value must meet the following criteria: 

381 1. is_valid_host_label(value) is True 

382 2. length between 3 and 63 characters (inclusive) 

383 3. does not contain uppercase characters 

384 4. is not formatted as an IP address 

385 

386 If allow_subdomains is True, split on `.` and validate 

387 each component separately. 

388 

389 :type value: str 

390 :type allow_subdomains: bool 

391 :rtype: bool 

392 """ 

393 if ( 

394 value is None 

395 or len(value) < 3 

396 or value.lower() != value 

397 or IPV4_RE.match(value) is not None 

398 ): 

399 return False 

400 

401 return self.is_valid_host_label( 

402 value, allow_subdomains=allow_subdomains 

403 ) 

404 

405 

406# maintains backwards compatibility as `Library` was misspelled 

407# in earlier versions 

408RuleSetStandardLibary = RuleSetStandardLibrary 

409 

410 

411class BaseRule: 

412 """Base interface for individual endpoint rules.""" 

413 

414 def __init__(self, conditions, documentation=None): 

415 self.conditions = conditions 

416 self.documentation = documentation 

417 

418 def evaluate(self, scope_vars, rule_lib): 

419 raise NotImplementedError() 

420 

421 def evaluate_conditions(self, scope_vars, rule_lib): 

422 """Determine if all conditions in a rule are met. 

423 

424 :type scope_vars: dict 

425 :type rule_lib: RuleSetStandardLibrary 

426 :rtype: bool 

427 """ 

428 for func_signature in self.conditions: 

429 result = rule_lib.call_function(func_signature, scope_vars) 

430 if result is False or result is None: 

431 return False 

432 return True 

433 

434 

435class RuleSetEndpoint(NamedTuple): 

436 """A resolved endpoint object returned by a rule.""" 

437 

438 url: str 

439 properties: dict 

440 headers: dict 

441 

442 

443class EndpointRule(BaseRule): 

444 def __init__(self, endpoint, **kwargs): 

445 super().__init__(**kwargs) 

446 self.endpoint = endpoint 

447 

448 def evaluate(self, scope_vars, rule_lib): 

449 """Determine if conditions are met to provide a valid endpoint. 

450 

451 :type scope_vars: dict 

452 :rtype: RuleSetEndpoint 

453 """ 

454 if self.evaluate_conditions(scope_vars, rule_lib): 

455 url = rule_lib.resolve_value(self.endpoint["url"], scope_vars) 

456 properties = self.resolve_properties( 

457 self.endpoint.get("properties", {}), 

458 scope_vars, 

459 rule_lib, 

460 ) 

461 headers = self.resolve_headers(scope_vars, rule_lib) 

462 return RuleSetEndpoint( 

463 url=url, properties=properties, headers=headers 

464 ) 

465 

466 return None 

467 

468 def resolve_properties(self, properties, scope_vars, rule_lib): 

469 """Traverse `properties` attribute, resolving any template strings. 

470 

471 :type properties: dict/list/str 

472 :type scope_vars: dict 

473 :type rule_lib: RuleSetStandardLibrary 

474 :rtype: dict 

475 """ 

476 if isinstance(properties, list): 

477 return [ 

478 self.resolve_properties(prop, scope_vars, rule_lib) 

479 for prop in properties 

480 ] 

481 elif isinstance(properties, dict): 

482 return { 

483 key: self.resolve_properties(value, scope_vars, rule_lib) 

484 for key, value in properties.items() 

485 } 

486 elif rule_lib.is_template(properties): 

487 return rule_lib.resolve_template_string(properties, scope_vars) 

488 

489 return properties 

490 

491 def resolve_headers(self, scope_vars, rule_lib): 

492 """Iterate through headers attribute resolving all values. 

493 

494 :type scope_vars: dict 

495 :type rule_lib: RuleSetStandardLibrary 

496 :rtype: dict 

497 """ 

498 resolved_headers = {} 

499 headers = self.endpoint.get("headers", {}) 

500 

501 for header, values in headers.items(): 

502 resolved_headers[header] = [ 

503 rule_lib.resolve_value(item, scope_vars) for item in values 

504 ] 

505 return resolved_headers 

506 

507 

508class ErrorRule(BaseRule): 

509 def __init__(self, error, **kwargs): 

510 super().__init__(**kwargs) 

511 self.error = error 

512 

513 def evaluate(self, scope_vars, rule_lib): 

514 """If an error rule's conditions are met, raise an error rule. 

515 

516 :type scope_vars: dict 

517 :type rule_lib: RuleSetStandardLibrary 

518 :rtype: EndpointResolutionError 

519 """ 

520 if self.evaluate_conditions(scope_vars, rule_lib): 

521 error = rule_lib.resolve_value(self.error, scope_vars) 

522 raise EndpointResolutionError(msg=error) 

523 return None 

524 

525 

526class TreeRule(BaseRule): 

527 """A tree rule is non-terminal meaning it will never be returned to a provider. 

528 Additionally this means it has no attributes that need to be resolved. 

529 """ 

530 

531 def __init__(self, rules, **kwargs): 

532 super().__init__(**kwargs) 

533 self.rules = [RuleCreator.create(**rule) for rule in rules] 

534 

535 def evaluate(self, scope_vars, rule_lib): 

536 """If a tree rule's conditions are met, iterate its sub-rules 

537 and return first result found. 

538 

539 :type scope_vars: dict 

540 :type rule_lib: RuleSetStandardLibrary 

541 :rtype: RuleSetEndpoint/EndpointResolutionError 

542 """ 

543 if self.evaluate_conditions(scope_vars, rule_lib): 

544 for rule in self.rules: 

545 # don't share scope_vars between rules 

546 rule_result = rule.evaluate(scope_vars.copy(), rule_lib) 

547 if rule_result: 

548 return rule_result 

549 return None 

550 

551 

552class RuleCreator: 

553 endpoint = EndpointRule 

554 error = ErrorRule 

555 tree = TreeRule 

556 

557 @classmethod 

558 def create(cls, **kwargs): 

559 """Create a rule instance from metadata. 

560 

561 :rtype: TreeRule/EndpointRule/ErrorRule 

562 """ 

563 rule_type = kwargs.pop("type") 

564 try: 

565 rule_class = getattr(cls, rule_type) 

566 except AttributeError: 

567 raise EndpointResolutionError( 

568 msg=f"Unknown rule type: {rule_type}. A rule must " 

569 "be of type tree, endpoint or error." 

570 ) 

571 else: 

572 return rule_class(**kwargs) 

573 

574 

575class ParameterType(Enum): 

576 """Translation from `type` attribute to native Python type.""" 

577 

578 string = str 

579 boolean = bool 

580 

581 

582class ParameterDefinition: 

583 """The spec of an individual parameter defined in a RuleSet.""" 

584 

585 def __init__( 

586 self, 

587 name, 

588 parameter_type, 

589 documentation=None, 

590 builtIn=None, 

591 default=None, 

592 required=None, 

593 deprecated=None, 

594 ): 

595 self.name = name 

596 try: 

597 self.parameter_type = getattr( 

598 ParameterType, parameter_type.lower() 

599 ).value 

600 except AttributeError: 

601 raise EndpointResolutionError( 

602 msg=f"Unknown parameter type: {parameter_type}. " 

603 "A parameter must be of type string or boolean." 

604 ) 

605 self.documentation = documentation 

606 self.builtin = builtIn 

607 self.default = default 

608 self.required = required 

609 self.deprecated = deprecated 

610 

611 def validate_input(self, value): 

612 """Perform base validation on parameter input. 

613 

614 :type value: Any 

615 :raises: EndpointParametersError 

616 """ 

617 

618 if not isinstance(value, self.parameter_type): 

619 raise EndpointResolutionError( 

620 msg=f"Value ({self.name}) is the wrong " 

621 f"type. Must be {self.parameter_type}." 

622 ) 

623 if self.deprecated is not None: 

624 depr_str = f"{self.name} has been deprecated." 

625 msg = self.deprecated.get("message") 

626 since = self.deprecated.get("since") 

627 if msg: 

628 depr_str += f"\n{msg}" 

629 if since: 

630 depr_str += f"\nDeprecated since {since}." 

631 logger.info(depr_str) 

632 

633 return None 

634 

635 def process_input(self, value): 

636 """Process input against spec, applying default if value is None.""" 

637 if value is None: 

638 if self.default is not None: 

639 return self.default 

640 if self.required: 

641 raise EndpointResolutionError( 

642 f"Cannot find value for required parameter {self.name}" 

643 ) 

644 # in all other cases, the parameter will keep the value None 

645 else: 

646 self.validate_input(value) 

647 return value 

648 

649 

650class RuleSet: 

651 """Collection of rules to derive a routable service endpoint.""" 

652 

653 def __init__( 

654 self, version, parameters, rules, partitions, documentation=None 

655 ): 

656 self.version = version 

657 self.parameters = self._ingest_parameter_spec(parameters) 

658 self.rules = [RuleCreator.create(**rule) for rule in rules] 

659 self.rule_lib = RuleSetStandardLibrary(partitions) 

660 self.documentation = documentation 

661 

662 def _ingest_parameter_spec(self, parameters): 

663 return { 

664 name: ParameterDefinition( 

665 name, 

666 spec["type"], 

667 spec.get("documentation"), 

668 spec.get("builtIn"), 

669 spec.get("default"), 

670 spec.get("required"), 

671 spec.get("deprecated"), 

672 ) 

673 for name, spec in parameters.items() 

674 } 

675 

676 def process_input_parameters(self, input_params): 

677 """Process each input parameter against its spec. 

678 

679 :type input_params: dict 

680 """ 

681 for name, spec in self.parameters.items(): 

682 value = spec.process_input(input_params.get(name)) 

683 if value is not None: 

684 input_params[name] = value 

685 return None 

686 

687 def evaluate(self, input_parameters): 

688 """Evaluate input parameters against rules returning first match. 

689 

690 :type input_parameters: dict 

691 """ 

692 self.process_input_parameters(input_parameters) 

693 for rule in self.rules: 

694 evaluation = rule.evaluate(input_parameters.copy(), self.rule_lib) 

695 if evaluation is not None: 

696 return evaluation 

697 return None 

698 

699 

700class EndpointProvider: 

701 """Derives endpoints from a RuleSet for given input parameters.""" 

702 

703 def __init__(self, ruleset_data, partition_data): 

704 self.ruleset = RuleSet(**ruleset_data, partitions=partition_data) 

705 

706 @lru_cache_weakref(maxsize=CACHE_SIZE) 

707 def resolve_endpoint(self, **input_parameters): 

708 """Match input parameters to a rule. 

709 

710 :type input_parameters: dict 

711 :rtype: RuleSetEndpoint 

712 """ 

713 params_for_error = input_parameters.copy() 

714 endpoint = self.ruleset.evaluate(input_parameters) 

715 if endpoint is None: 

716 param_string = "\n".join( 

717 [f"{key}: {value}" for key, value in params_for_error.items()] 

718 ) 

719 raise EndpointResolutionError( 

720 msg=f"No endpoint found for parameters:\n{param_string}" 

721 ) 

722 return endpoint