Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/endpoint_provider.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

306 statements  

1# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"). You 

4# may not use this file except in compliance with the License. A copy of 

5# the License is located at 

6# 

7# http://aws.amazon.com/apache2.0/ 

8# 

9# or in the "license" file accompanying this file. This file is 

10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 

11# ANY KIND, either express or implied. See the License for the specific 

12# language governing permissions and limitations under the License. 

13 

14""" 

15NOTE: All classes and functions in this module are considered private and are 

16subject to abrupt breaking changes. Please do not use them directly. 

17 

18To view the raw JSON that the objects in this module represent, please 

19go to any `endpoint-rule-set.json` file in /botocore/data/<service>/<api version>/ 

20or you can look at the test files in /tests/unit/data/endpoints/valid-rules/ 

21""" 

22 

23import logging 

24import re 

25from enum import Enum 

26from string import Formatter 

27from typing import NamedTuple 

28 

29from botocore import xform_name 

30from botocore.compat import IPV4_RE, quote, urlparse 

31from botocore.exceptions import EndpointResolutionError 

32from botocore.utils import ( 

33 ArnParser, 

34 InvalidArnException, 

35 is_valid_ipv4_endpoint_url, 

36 is_valid_ipv6_endpoint_url, 

37 lru_cache_weakref, 

38 normalize_url_path, 

39 percent_encode, 

40) 

41 

42logger = logging.getLogger(__name__) 

43 

44TEMPLATE_STRING_RE = re.compile(r"\{[a-zA-Z#]+\}") 

45GET_ATTR_RE = re.compile(r"(\w+)\[(\d+)\]") 

46VALID_HOST_LABEL_RE = re.compile( 

47 r"^(?!-)[a-zA-Z\d-]{1,63}(?<!-)$", 

48) 

49CACHE_SIZE = 100 

50ARN_PARSER = ArnParser() 

51STRING_FORMATTER = Formatter() 

52 

53 

54class RuleSetStandardLibrary: 

55 """Rule actions to be performed by the EndpointProvider.""" 

56 

57 def __init__(self, partitions_data): 

58 self.partitions_data = partitions_data 

59 

60 def is_func(self, argument): 

61 """Determine if an object is a function object. 

62 

63 :type argument: Any 

64 :rtype: bool 

65 """ 

66 return isinstance(argument, dict) and "fn" in argument 

67 

68 def is_ref(self, argument): 

69 """Determine if an object is a reference object. 

70 

71 :type argument: Any 

72 :rtype: bool 

73 """ 

74 return isinstance(argument, dict) and "ref" in argument 

75 

76 def is_template(self, argument): 

77 """Determine if an object contains a template string. 

78 

79 :type argument: Any 

80 :rtpe: bool 

81 """ 

82 return ( 

83 isinstance(argument, str) 

84 and TEMPLATE_STRING_RE.search(argument) is not None 

85 ) 

86 

87 def resolve_template_string(self, value, scope_vars): 

88 """Resolve and inject values into a template string. 

89 

90 :type value: str 

91 :type scope_vars: dict 

92 :rtype: str 

93 """ 

94 result = "" 

95 for literal, reference, _, _ in STRING_FORMATTER.parse(value): 

96 if reference is not None: 

97 template_value = scope_vars 

98 template_params = reference.split("#") 

99 for param in template_params: 

100 template_value = template_value[param] 

101 result += f"{literal}{template_value}" 

102 else: 

103 result += literal 

104 return result 

105 

106 def resolve_value(self, value, scope_vars): 

107 """Return evaluated value based on type. 

108 

109 :type value: Any 

110 :type scope_vars: dict 

111 :rtype: Any 

112 """ 

113 if self.is_func(value): 

114 return self.call_function(value, scope_vars) 

115 elif self.is_ref(value): 

116 return scope_vars.get(value["ref"]) 

117 elif self.is_template(value): 

118 return self.resolve_template_string(value, scope_vars) 

119 

120 return value 

121 

122 def convert_func_name(self, value): 

123 """Normalize function names. 

124 

125 :type value: str 

126 :rtype: str 

127 """ 

128 normalized_name = f"{xform_name(value)}" 

129 if normalized_name == "not": 

130 normalized_name = f"_{normalized_name}" 

131 return normalized_name.replace(".", "_") 

132 

133 def call_function(self, func_signature, scope_vars): 

134 """Call the function with the resolved arguments and assign to `scope_vars` 

135 when applicable. 

136 

137 :type func_signature: dict 

138 :type scope_vars: dict 

139 :rtype: Any 

140 """ 

141 func_args = [ 

142 self.resolve_value(arg, scope_vars) 

143 for arg in func_signature["argv"] 

144 ] 

145 func_name = self.convert_func_name(func_signature["fn"]) 

146 func = getattr(self, func_name) 

147 result = func(*func_args) 

148 if "assign" in func_signature: 

149 assign = func_signature["assign"] 

150 if assign in scope_vars: 

151 raise EndpointResolutionError( 

152 msg=f"Assignment {assign} already exists in " 

153 "scoped variables and cannot be overwritten" 

154 ) 

155 scope_vars[assign] = result 

156 return result 

157 

158 def is_set(self, value): 

159 """Evaluates whether a value is set. 

160 

161 :type value: Any 

162 :rytpe: bool 

163 """ 

164 return value is not None 

165 

166 def get_attr(self, value, path): 

167 """Find an attribute within a value given a path string. The path can contain 

168 the name of the attribute and an index in brackets. A period separating attribute 

169 names indicates the one to the right is nested. The index will always occur at 

170 the end of the path. 

171 

172 :type value: dict or list 

173 :type path: str 

174 :rtype: Any 

175 """ 

176 for part in path.split("."): 

177 match = GET_ATTR_RE.search(part) 

178 if match is not None: 

179 name, index = match.groups() 

180 index = int(index) 

181 value = value.get(name) 

182 if value is None or index >= len(value): 

183 return None 

184 return value[index] 

185 else: 

186 value = value[part] 

187 return value 

188 

189 def format_partition_output(self, partition): 

190 output = partition["outputs"] 

191 output["name"] = partition["id"] 

192 return output 

193 

194 def is_partition_match(self, region, partition): 

195 matches_regex = re.match(partition["regionRegex"], region) is not None 

196 return region in partition["regions"] or matches_regex 

197 

198 def aws_partition(self, value): 

199 """Match a region string to an AWS partition. 

200 

201 :type value: str 

202 :rtype: dict 

203 """ 

204 partitions = self.partitions_data['partitions'] 

205 

206 if value is not None: 

207 for partition in partitions: 

208 if self.is_partition_match(value, partition): 

209 return self.format_partition_output(partition) 

210 

211 # return the default partition if no matches were found 

212 aws_partition = partitions[0] 

213 return self.format_partition_output(aws_partition) 

214 

215 def aws_parse_arn(self, value): 

216 """Parse and validate string for ARN components. 

217 

218 :type value: str 

219 :rtype: dict 

220 """ 

221 if value is None or not value.startswith("arn:"): 

222 return None 

223 

224 try: 

225 arn_dict = ARN_PARSER.parse_arn(value) 

226 except InvalidArnException: 

227 return None 

228 

229 # partition, resource, and service are required 

230 if not all( 

231 (arn_dict["partition"], arn_dict["service"], arn_dict["resource"]) 

232 ): 

233 return None 

234 

235 arn_dict["accountId"] = arn_dict.pop("account") 

236 

237 resource = arn_dict.pop("resource") 

238 arn_dict["resourceId"] = resource.replace(":", "/").split("/") 

239 

240 return arn_dict 

241 

242 def is_valid_host_label(self, value, allow_subdomains): 

243 """Evaluates whether a value is a valid host label per 

244 RFC 1123. If allow_subdomains is True, split on `.` and validate 

245 each component separately. 

246 

247 :type value: str 

248 :type allow_subdomains: bool 

249 :rtype: bool 

250 """ 

251 if value is None or allow_subdomains is False and value.count(".") > 0: 

252 return False 

253 

254 if allow_subdomains is True: 

255 return all( 

256 self.is_valid_host_label(label, False) 

257 for label in value.split(".") 

258 ) 

259 

260 return VALID_HOST_LABEL_RE.match(value) is not None 

261 

262 def string_equals(self, value1, value2): 

263 """Evaluates two string values for equality. 

264 

265 :type value1: str 

266 :type value2: str 

267 :rtype: bool 

268 """ 

269 if not all(isinstance(val, str) for val in (value1, value2)): 

270 msg = f"Both values must be strings, not {type(value1)} and {type(value2)}." 

271 raise EndpointResolutionError(msg=msg) 

272 return value1 == value2 

273 

274 def uri_encode(self, value): 

275 """Perform percent-encoding on an input string. 

276 

277 :type value: str 

278 :rytpe: str 

279 """ 

280 if value is None: 

281 return None 

282 

283 return percent_encode(value) 

284 

285 def parse_url(self, value): 

286 """Parse a URL string into components. 

287 

288 :type value: str 

289 :rtype: dict 

290 """ 

291 if value is None: 

292 return None 

293 

294 url_components = urlparse(value) 

295 try: 

296 # url_parse may assign non-integer values to 

297 # `port` and will fail when accessed. 

298 url_components.port 

299 except ValueError: 

300 return None 

301 

302 scheme = url_components.scheme 

303 query = url_components.query 

304 # URLs with queries are not supported 

305 if scheme not in ("https", "http") or len(query) > 0: 

306 return None 

307 

308 path = url_components.path 

309 normalized_path = quote(normalize_url_path(path)) 

310 if not normalized_path.endswith("/"): 

311 normalized_path = f"{normalized_path}/" 

312 

313 return { 

314 "scheme": scheme, 

315 "authority": url_components.netloc, 

316 "path": path, 

317 "normalizedPath": normalized_path, 

318 "isIp": is_valid_ipv4_endpoint_url(value) 

319 or is_valid_ipv6_endpoint_url(value), 

320 } 

321 

322 def boolean_equals(self, value1, value2): 

323 """Evaluates two boolean values for equality. 

324 

325 :type value1: bool 

326 :type value2: bool 

327 :rtype: bool 

328 """ 

329 if not all(isinstance(val, bool) for val in (value1, value2)): 

330 msg = f"Both arguments must be bools, not {type(value1)} and {type(value2)}." 

331 raise EndpointResolutionError(msg=msg) 

332 return value1 is value2 

333 

334 def is_ascii(self, value): 

335 """Evaluates if a string only contains ASCII characters. 

336 

337 :type value: str 

338 :rtype: bool 

339 """ 

340 try: 

341 value.encode("ascii") 

342 return True 

343 except UnicodeEncodeError: 

344 return False 

345 

346 def substring(self, value, start, stop, reverse): 

347 """Computes a substring given the start index and end index. If `reverse` is 

348 True, slice the string from the end instead. 

349 

350 :type value: str 

351 :type start: int 

352 :type end: int 

353 :type reverse: bool 

354 :rtype: str 

355 """ 

356 if not isinstance(value, str): 

357 msg = f"Input must be a string, not {type(value)}." 

358 raise EndpointResolutionError(msg=msg) 

359 if start >= stop or len(value) < stop or not self.is_ascii(value): 

360 return None 

361 

362 if reverse is True: 

363 r_start = len(value) - stop 

364 r_stop = len(value) - start 

365 return value[r_start:r_stop] 

366 

367 return value[start:stop] 

368 

369 def _not(self, value): 

370 """A function implementation of the logical operator `not`. 

371 

372 :type value: Any 

373 :rtype: bool 

374 """ 

375 return not value 

376 

377 def aws_is_virtual_hostable_s3_bucket(self, value, allow_subdomains): 

378 """Evaluates whether a value is a valid bucket name for virtual host 

379 style bucket URLs. To pass, the value must meet the following criteria: 

380 1. is_valid_host_label(value) is True 

381 2. length between 3 and 63 characters (inclusive) 

382 3. does not contain uppercase characters 

383 4. is not formatted as an IP address 

384 

385 If allow_subdomains is True, split on `.` and validate 

386 each component separately. 

387 

388 :type value: str 

389 :type allow_subdomains: bool 

390 :rtype: bool 

391 """ 

392 if ( 

393 value is None 

394 or len(value) < 3 

395 or value.lower() != value 

396 or IPV4_RE.match(value) is not None 

397 ): 

398 return False 

399 

400 return self.is_valid_host_label( 

401 value, allow_subdomains=allow_subdomains 

402 ) 

403 

404 

405# maintains backwards compatibility as `Library` was misspelled 

406# in earlier versions 

407RuleSetStandardLibary = RuleSetStandardLibrary 

408 

409 

410class BaseRule: 

411 """Base interface for individual endpoint rules.""" 

412 

413 def __init__(self, conditions, documentation=None): 

414 self.conditions = conditions 

415 self.documentation = documentation 

416 

417 def evaluate(self, scope_vars, rule_lib): 

418 raise NotImplementedError() 

419 

420 def evaluate_conditions(self, scope_vars, rule_lib): 

421 """Determine if all conditions in a rule are met. 

422 

423 :type scope_vars: dict 

424 :type rule_lib: RuleSetStandardLibrary 

425 :rtype: bool 

426 """ 

427 for func_signature in self.conditions: 

428 result = rule_lib.call_function(func_signature, scope_vars) 

429 if result is False or result is None: 

430 return False 

431 return True 

432 

433 

434class RuleSetEndpoint(NamedTuple): 

435 """A resolved endpoint object returned by a rule.""" 

436 

437 url: str 

438 properties: dict 

439 headers: dict 

440 

441 

442class EndpointRule(BaseRule): 

443 def __init__(self, endpoint, **kwargs): 

444 super().__init__(**kwargs) 

445 self.endpoint = endpoint 

446 

447 def evaluate(self, scope_vars, rule_lib): 

448 """Determine if conditions are met to provide a valid endpoint. 

449 

450 :type scope_vars: dict 

451 :rtype: RuleSetEndpoint 

452 """ 

453 if self.evaluate_conditions(scope_vars, rule_lib): 

454 url = rule_lib.resolve_value(self.endpoint["url"], scope_vars) 

455 properties = self.resolve_properties( 

456 self.endpoint.get("properties", {}), 

457 scope_vars, 

458 rule_lib, 

459 ) 

460 headers = self.resolve_headers(scope_vars, rule_lib) 

461 return RuleSetEndpoint( 

462 url=url, properties=properties, headers=headers 

463 ) 

464 

465 return None 

466 

467 def resolve_properties(self, properties, scope_vars, rule_lib): 

468 """Traverse `properties` attribute, resolving any template strings. 

469 

470 :type properties: dict/list/str 

471 :type scope_vars: dict 

472 :type rule_lib: RuleSetStandardLibrary 

473 :rtype: dict 

474 """ 

475 if isinstance(properties, list): 

476 return [ 

477 self.resolve_properties(prop, scope_vars, rule_lib) 

478 for prop in properties 

479 ] 

480 elif isinstance(properties, dict): 

481 return { 

482 key: self.resolve_properties(value, scope_vars, rule_lib) 

483 for key, value in properties.items() 

484 } 

485 elif rule_lib.is_template(properties): 

486 return rule_lib.resolve_template_string(properties, scope_vars) 

487 

488 return properties 

489 

490 def resolve_headers(self, scope_vars, rule_lib): 

491 """Iterate through headers attribute resolving all values. 

492 

493 :type scope_vars: dict 

494 :type rule_lib: RuleSetStandardLibrary 

495 :rtype: dict 

496 """ 

497 resolved_headers = {} 

498 headers = self.endpoint.get("headers", {}) 

499 

500 for header, values in headers.items(): 

501 resolved_headers[header] = [ 

502 rule_lib.resolve_value(item, scope_vars) for item in values 

503 ] 

504 return resolved_headers 

505 

506 

507class ErrorRule(BaseRule): 

508 def __init__(self, error, **kwargs): 

509 super().__init__(**kwargs) 

510 self.error = error 

511 

512 def evaluate(self, scope_vars, rule_lib): 

513 """If an error rule's conditions are met, raise an error rule. 

514 

515 :type scope_vars: dict 

516 :type rule_lib: RuleSetStandardLibrary 

517 :rtype: EndpointResolutionError 

518 """ 

519 if self.evaluate_conditions(scope_vars, rule_lib): 

520 error = rule_lib.resolve_value(self.error, scope_vars) 

521 raise EndpointResolutionError(msg=error) 

522 return None 

523 

524 

525class TreeRule(BaseRule): 

526 """A tree rule is non-terminal meaning it will never be returned to a provider. 

527 Additionally this means it has no attributes that need to be resolved. 

528 """ 

529 

530 def __init__(self, rules, **kwargs): 

531 super().__init__(**kwargs) 

532 self.rules = [RuleCreator.create(**rule) for rule in rules] 

533 

534 def evaluate(self, scope_vars, rule_lib): 

535 """If a tree rule's conditions are met, iterate its sub-rules 

536 and return first result found. 

537 

538 :type scope_vars: dict 

539 :type rule_lib: RuleSetStandardLibrary 

540 :rtype: RuleSetEndpoint/EndpointResolutionError 

541 """ 

542 if self.evaluate_conditions(scope_vars, rule_lib): 

543 for rule in self.rules: 

544 # don't share scope_vars between rules 

545 rule_result = rule.evaluate(scope_vars.copy(), rule_lib) 

546 if rule_result: 

547 return rule_result 

548 return None 

549 

550 

551class RuleCreator: 

552 endpoint = EndpointRule 

553 error = ErrorRule 

554 tree = TreeRule 

555 

556 @classmethod 

557 def create(cls, **kwargs): 

558 """Create a rule instance from metadata. 

559 

560 :rtype: TreeRule/EndpointRule/ErrorRule 

561 """ 

562 rule_type = kwargs.pop("type") 

563 try: 

564 rule_class = getattr(cls, rule_type) 

565 except AttributeError: 

566 raise EndpointResolutionError( 

567 msg=f"Unknown rule type: {rule_type}. A rule must " 

568 "be of type tree, endpoint or error." 

569 ) 

570 else: 

571 return rule_class(**kwargs) 

572 

573 

574class ParameterType(Enum): 

575 """Translation from `type` attribute to native Python type.""" 

576 

577 string = str 

578 boolean = bool 

579 

580 

581class ParameterDefinition: 

582 """The spec of an individual parameter defined in a RuleSet.""" 

583 

584 def __init__( 

585 self, 

586 name, 

587 parameter_type, 

588 documentation=None, 

589 builtIn=None, 

590 default=None, 

591 required=None, 

592 deprecated=None, 

593 ): 

594 self.name = name 

595 try: 

596 self.parameter_type = getattr( 

597 ParameterType, parameter_type.lower() 

598 ).value 

599 except AttributeError: 

600 raise EndpointResolutionError( 

601 msg=f"Unknown parameter type: {parameter_type}. " 

602 "A parameter must be of type string or boolean." 

603 ) 

604 self.documentation = documentation 

605 self.builtin = builtIn 

606 self.default = default 

607 self.required = required 

608 self.deprecated = deprecated 

609 

610 def validate_input(self, value): 

611 """Perform base validation on parameter input. 

612 

613 :type value: Any 

614 :raises: EndpointParametersError 

615 """ 

616 

617 if not isinstance(value, self.parameter_type): 

618 raise EndpointResolutionError( 

619 msg=f"Value ({self.name}) is the wrong " 

620 f"type. Must be {self.parameter_type}." 

621 ) 

622 if self.deprecated is not None: 

623 depr_str = f"{self.name} has been deprecated." 

624 msg = self.deprecated.get("message") 

625 since = self.deprecated.get("since") 

626 if msg: 

627 depr_str += f"\n{msg}" 

628 if since: 

629 depr_str += f"\nDeprecated since {since}." 

630 logger.info(depr_str) 

631 

632 return None 

633 

634 def process_input(self, value): 

635 """Process input against spec, applying default if value is None.""" 

636 if value is None: 

637 if self.default is not None: 

638 return self.default 

639 if self.required: 

640 raise EndpointResolutionError( 

641 f"Cannot find value for required parameter {self.name}" 

642 ) 

643 # in all other cases, the parameter will keep the value None 

644 else: 

645 self.validate_input(value) 

646 return value 

647 

648 

649class RuleSet: 

650 """Collection of rules to derive a routable service endpoint.""" 

651 

652 def __init__( 

653 self, version, parameters, rules, partitions, documentation=None 

654 ): 

655 self.version = version 

656 self.parameters = self._ingest_parameter_spec(parameters) 

657 self.rules = [RuleCreator.create(**rule) for rule in rules] 

658 self.rule_lib = RuleSetStandardLibrary(partitions) 

659 self.documentation = documentation 

660 

661 def _ingest_parameter_spec(self, parameters): 

662 return { 

663 name: ParameterDefinition( 

664 name, 

665 spec["type"], 

666 spec.get("documentation"), 

667 spec.get("builtIn"), 

668 spec.get("default"), 

669 spec.get("required"), 

670 spec.get("deprecated"), 

671 ) 

672 for name, spec in parameters.items() 

673 } 

674 

675 def process_input_parameters(self, input_params): 

676 """Process each input parameter against its spec. 

677 

678 :type input_params: dict 

679 """ 

680 for name, spec in self.parameters.items(): 

681 value = spec.process_input(input_params.get(name)) 

682 if value is not None: 

683 input_params[name] = value 

684 return None 

685 

686 def evaluate(self, input_parameters): 

687 """Evaluate input parameters against rules returning first match. 

688 

689 :type input_parameters: dict 

690 """ 

691 self.process_input_parameters(input_parameters) 

692 for rule in self.rules: 

693 evaluation = rule.evaluate(input_parameters.copy(), self.rule_lib) 

694 if evaluation is not None: 

695 return evaluation 

696 return None 

697 

698 

699class EndpointProvider: 

700 """Derives endpoints from a RuleSet for given input parameters.""" 

701 

702 def __init__(self, ruleset_data, partition_data): 

703 self.ruleset = RuleSet(**ruleset_data, partitions=partition_data) 

704 

705 @lru_cache_weakref(maxsize=CACHE_SIZE) 

706 def resolve_endpoint(self, **input_parameters): 

707 """Match input parameters to a rule. 

708 

709 :type input_parameters: dict 

710 :rtype: RuleSetEndpoint 

711 """ 

712 params_for_error = input_parameters.copy() 

713 endpoint = self.ruleset.evaluate(input_parameters) 

714 if endpoint is None: 

715 param_string = "\n".join( 

716 [f"{key}: {value}" for key, value in params_for_error.items()] 

717 ) 

718 raise EndpointResolutionError( 

719 msg=f"No endpoint found for parameters:\n{param_string}" 

720 ) 

721 return endpoint