Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/botocore/endpoint_provider.py: 27%
306 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"). You
4# may not use this file except in compliance with the License. A copy of
5# the License is located at
6#
7# http://aws.amazon.com/apache2.0/
8#
9# or in the "license" file accompanying this file. This file is
10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11# ANY KIND, either express or implied. See the License for the specific
12# language governing permissions and limitations under the License.
14"""
15NOTE: All classes and functions in this module are considered private and are
16subject to abrupt breaking changes. Please do not use them directly.
18To view the raw JSON that the objects in this module represent, please
19go to any `endpoint-rule-set.json` file in /botocore/data/<service>/<api version>/
20or you can look at the test files in /tests/unit/data/endpoints/valid-rules/
21"""
24import logging
25import re
26from enum import Enum
27from string import Formatter
28from typing import NamedTuple
30from botocore import xform_name
31from botocore.compat import IPV4_RE, quote, urlparse
32from botocore.exceptions import EndpointResolutionError
33from botocore.utils import (
34 ArnParser,
35 InvalidArnException,
36 is_valid_ipv4_endpoint_url,
37 is_valid_ipv6_endpoint_url,
38 lru_cache_weakref,
39 normalize_url_path,
40 percent_encode,
41)
43logger = logging.getLogger(__name__)
45TEMPLATE_STRING_RE = re.compile(r"\{[a-zA-Z#]+\}")
46GET_ATTR_RE = re.compile(r"(\w+)\[(\d+)\]")
47VALID_HOST_LABEL_RE = re.compile(
48 r"^(?!-)[a-zA-Z\d-]{1,63}(?<!-)$",
49)
50CACHE_SIZE = 100
51ARN_PARSER = ArnParser()
52STRING_FORMATTER = Formatter()
55class RuleSetStandardLibrary:
56 """Rule actions to be performed by the EndpointProvider."""
58 def __init__(self, partitions_data):
59 self.partitions_data = partitions_data
61 def is_func(self, argument):
62 """Determine if an object is a function object.
64 :type argument: Any
65 :rtype: bool
66 """
67 return isinstance(argument, dict) and "fn" in argument
69 def is_ref(self, argument):
70 """Determine if an object is a reference object.
72 :type argument: Any
73 :rtype: bool
74 """
75 return isinstance(argument, dict) and "ref" in argument
77 def is_template(self, argument):
78 """Determine if an object contains a template string.
80 :type argument: Any
81 :rtpe: bool
82 """
83 return (
84 isinstance(argument, str)
85 and TEMPLATE_STRING_RE.search(argument) is not None
86 )
88 def resolve_template_string(self, value, scope_vars):
89 """Resolve and inject values into a template string.
91 :type value: str
92 :type scope_vars: dict
93 :rtype: str
94 """
95 result = ""
96 for literal, reference, _, _ in STRING_FORMATTER.parse(value):
97 if reference is not None:
98 template_value = scope_vars
99 template_params = reference.split("#")
100 for param in template_params:
101 template_value = template_value[param]
102 result += f"{literal}{template_value}"
103 else:
104 result += literal
105 return result
107 def resolve_value(self, value, scope_vars):
108 """Return evaluated value based on type.
110 :type value: Any
111 :type scope_vars: dict
112 :rtype: Any
113 """
114 if self.is_func(value):
115 return self.call_function(value, scope_vars)
116 elif self.is_ref(value):
117 return scope_vars.get(value["ref"])
118 elif self.is_template(value):
119 return self.resolve_template_string(value, scope_vars)
121 return value
123 def convert_func_name(self, value):
124 """Normalize function names.
126 :type value: str
127 :rtype: str
128 """
129 normalized_name = f"{xform_name(value)}"
130 if normalized_name == "not":
131 normalized_name = f"_{normalized_name}"
132 return normalized_name.replace(".", "_")
134 def call_function(self, func_signature, scope_vars):
135 """Call the function with the resolved arguments and assign to `scope_vars`
136 when applicable.
138 :type func_signature: dict
139 :type scope_vars: dict
140 :rtype: Any
141 """
142 func_args = [
143 self.resolve_value(arg, scope_vars)
144 for arg in func_signature["argv"]
145 ]
146 func_name = self.convert_func_name(func_signature["fn"])
147 func = getattr(self, func_name)
148 result = func(*func_args)
149 if "assign" in func_signature:
150 assign = func_signature["assign"]
151 if assign in scope_vars:
152 raise EndpointResolutionError(
153 msg=f"Assignment {assign} already exists in "
154 "scoped variables and cannot be overwritten"
155 )
156 scope_vars[assign] = result
157 return result
159 def is_set(self, value):
160 """Evaluates whether a value is set.
162 :type value: Any
163 :rytpe: bool
164 """
165 return value is not None
167 def get_attr(self, value, path):
168 """Find an attribute within a value given a path string. The path can contain
169 the name of the attribute and an index in brackets. A period separating attribute
170 names indicates the one to the right is nested. The index will always occur at
171 the end of the path.
173 :type value: dict or list
174 :type path: str
175 :rtype: Any
176 """
177 for part in path.split("."):
178 match = GET_ATTR_RE.search(part)
179 if match is not None:
180 name, index = match.groups()
181 index = int(index)
182 value = value.get(name)
183 if value is None or index >= len(value):
184 return None
185 return value[index]
186 else:
187 value = value[part]
188 return value
190 def format_partition_output(self, partition):
191 output = partition["outputs"]
192 output["name"] = partition["id"]
193 return output
195 def is_partition_match(self, region, partition):
196 matches_regex = re.match(partition["regionRegex"], region) is not None
197 return region in partition["regions"] or matches_regex
199 def aws_partition(self, value):
200 """Match a region string to an AWS partition.
202 :type value: str
203 :rtype: dict
204 """
205 partitions = self.partitions_data['partitions']
207 if value is not None:
208 for partition in partitions:
209 if self.is_partition_match(value, partition):
210 return self.format_partition_output(partition)
212 # return the default partition if no matches were found
213 aws_partition = partitions[0]
214 return self.format_partition_output(aws_partition)
216 def aws_parse_arn(self, value):
217 """Parse and validate string for ARN components.
219 :type value: str
220 :rtype: dict
221 """
222 if value is None or not value.startswith("arn:"):
223 return None
225 try:
226 arn_dict = ARN_PARSER.parse_arn(value)
227 except InvalidArnException:
228 return None
230 # partition, resource, and service are required
231 if not all(
232 (arn_dict["partition"], arn_dict["service"], arn_dict["resource"])
233 ):
234 return None
236 arn_dict["accountId"] = arn_dict.pop("account")
238 resource = arn_dict.pop("resource")
239 arn_dict["resourceId"] = resource.replace(":", "/").split("/")
241 return arn_dict
243 def is_valid_host_label(self, value, allow_subdomains):
244 """Evaluates whether a value is a valid host label per
245 RFC 1123. If allow_subdomains is True, split on `.` and validate
246 each component separately.
248 :type value: str
249 :type allow_subdomains: bool
250 :rtype: bool
251 """
252 if value is None or allow_subdomains is False and value.count(".") > 0:
253 return False
255 if allow_subdomains is True:
256 return all(
257 self.is_valid_host_label(label, False)
258 for label in value.split(".")
259 )
261 return VALID_HOST_LABEL_RE.match(value) is not None
263 def string_equals(self, value1, value2):
264 """Evaluates two string values for equality.
266 :type value1: str
267 :type value2: str
268 :rtype: bool
269 """
270 if not all(isinstance(val, str) for val in (value1, value2)):
271 msg = f"Both values must be strings, not {type(value1)} and {type(value2)}."
272 raise EndpointResolutionError(msg=msg)
273 return value1 == value2
275 def uri_encode(self, value):
276 """Perform percent-encoding on an input string.
278 :type value: str
279 :rytpe: str
280 """
281 if value is None:
282 return None
284 return percent_encode(value)
286 def parse_url(self, value):
287 """Parse a URL string into components.
289 :type value: str
290 :rtype: dict
291 """
292 if value is None:
293 return None
295 url_components = urlparse(value)
296 try:
297 # url_parse may assign non-integer values to
298 # `port` and will fail when accessed.
299 url_components.port
300 except ValueError:
301 return None
303 scheme = url_components.scheme
304 query = url_components.query
305 # URLs with queries are not supported
306 if scheme not in ("https", "http") or len(query) > 0:
307 return None
309 path = url_components.path
310 normalized_path = quote(normalize_url_path(path))
311 if not normalized_path.endswith("/"):
312 normalized_path = f"{normalized_path}/"
314 return {
315 "scheme": scheme,
316 "authority": url_components.netloc,
317 "path": path,
318 "normalizedPath": normalized_path,
319 "isIp": is_valid_ipv4_endpoint_url(value)
320 or is_valid_ipv6_endpoint_url(value),
321 }
323 def boolean_equals(self, value1, value2):
324 """Evaluates two boolean values for equality.
326 :type value1: bool
327 :type value2: bool
328 :rtype: bool
329 """
330 if not all(isinstance(val, bool) for val in (value1, value2)):
331 msg = f"Both arguments must be bools, not {type(value1)} and {type(value2)}."
332 raise EndpointResolutionError(msg=msg)
333 return value1 is value2
335 def is_ascii(self, value):
336 """Evaluates if a string only contains ASCII characters.
338 :type value: str
339 :rtype: bool
340 """
341 try:
342 value.encode("ascii")
343 return True
344 except UnicodeEncodeError:
345 return False
347 def substring(self, value, start, stop, reverse):
348 """Computes a substring given the start index and end index. If `reverse` is
349 True, slice the string from the end instead.
351 :type value: str
352 :type start: int
353 :type end: int
354 :type reverse: bool
355 :rtype: str
356 """
357 if not isinstance(value, str):
358 msg = f"Input must be a string, not {type(value)}."
359 raise EndpointResolutionError(msg=msg)
360 if start >= stop or len(value) < stop or not self.is_ascii(value):
361 return None
363 if reverse is True:
364 r_start = len(value) - stop
365 r_stop = len(value) - start
366 return value[r_start:r_stop]
368 return value[start:stop]
370 def _not(self, value):
371 """A function implementation of the logical operator `not`.
373 :type value: Any
374 :rtype: bool
375 """
376 return not value
378 def aws_is_virtual_hostable_s3_bucket(self, value, allow_subdomains):
379 """Evaluates whether a value is a valid bucket name for virtual host
380 style bucket URLs. To pass, the value must meet the following criteria:
381 1. is_valid_host_label(value) is True
382 2. length between 3 and 63 characters (inclusive)
383 3. does not contain uppercase characters
384 4. is not formatted as an IP address
386 If allow_subdomains is True, split on `.` and validate
387 each component separately.
389 :type value: str
390 :type allow_subdomains: bool
391 :rtype: bool
392 """
393 if (
394 value is None
395 or len(value) < 3
396 or value.lower() != value
397 or IPV4_RE.match(value) is not None
398 ):
399 return False
401 return self.is_valid_host_label(
402 value, allow_subdomains=allow_subdomains
403 )
406# maintains backwards compatibility as `Library` was misspelled
407# in earlier versions
408RuleSetStandardLibary = RuleSetStandardLibrary
411class BaseRule:
412 """Base interface for individual endpoint rules."""
414 def __init__(self, conditions, documentation=None):
415 self.conditions = conditions
416 self.documentation = documentation
418 def evaluate(self, scope_vars, rule_lib):
419 raise NotImplementedError()
421 def evaluate_conditions(self, scope_vars, rule_lib):
422 """Determine if all conditions in a rule are met.
424 :type scope_vars: dict
425 :type rule_lib: RuleSetStandardLibrary
426 :rtype: bool
427 """
428 for func_signature in self.conditions:
429 result = rule_lib.call_function(func_signature, scope_vars)
430 if result is False or result is None:
431 return False
432 return True
435class RuleSetEndpoint(NamedTuple):
436 """A resolved endpoint object returned by a rule."""
438 url: str
439 properties: dict
440 headers: dict
443class EndpointRule(BaseRule):
444 def __init__(self, endpoint, **kwargs):
445 super().__init__(**kwargs)
446 self.endpoint = endpoint
448 def evaluate(self, scope_vars, rule_lib):
449 """Determine if conditions are met to provide a valid endpoint.
451 :type scope_vars: dict
452 :rtype: RuleSetEndpoint
453 """
454 if self.evaluate_conditions(scope_vars, rule_lib):
455 url = rule_lib.resolve_value(self.endpoint["url"], scope_vars)
456 properties = self.resolve_properties(
457 self.endpoint.get("properties", {}),
458 scope_vars,
459 rule_lib,
460 )
461 headers = self.resolve_headers(scope_vars, rule_lib)
462 return RuleSetEndpoint(
463 url=url, properties=properties, headers=headers
464 )
466 return None
468 def resolve_properties(self, properties, scope_vars, rule_lib):
469 """Traverse `properties` attribute, resolving any template strings.
471 :type properties: dict/list/str
472 :type scope_vars: dict
473 :type rule_lib: RuleSetStandardLibrary
474 :rtype: dict
475 """
476 if isinstance(properties, list):
477 return [
478 self.resolve_properties(prop, scope_vars, rule_lib)
479 for prop in properties
480 ]
481 elif isinstance(properties, dict):
482 return {
483 key: self.resolve_properties(value, scope_vars, rule_lib)
484 for key, value in properties.items()
485 }
486 elif rule_lib.is_template(properties):
487 return rule_lib.resolve_template_string(properties, scope_vars)
489 return properties
491 def resolve_headers(self, scope_vars, rule_lib):
492 """Iterate through headers attribute resolving all values.
494 :type scope_vars: dict
495 :type rule_lib: RuleSetStandardLibrary
496 :rtype: dict
497 """
498 resolved_headers = {}
499 headers = self.endpoint.get("headers", {})
501 for header, values in headers.items():
502 resolved_headers[header] = [
503 rule_lib.resolve_value(item, scope_vars) for item in values
504 ]
505 return resolved_headers
508class ErrorRule(BaseRule):
509 def __init__(self, error, **kwargs):
510 super().__init__(**kwargs)
511 self.error = error
513 def evaluate(self, scope_vars, rule_lib):
514 """If an error rule's conditions are met, raise an error rule.
516 :type scope_vars: dict
517 :type rule_lib: RuleSetStandardLibrary
518 :rtype: EndpointResolutionError
519 """
520 if self.evaluate_conditions(scope_vars, rule_lib):
521 error = rule_lib.resolve_value(self.error, scope_vars)
522 raise EndpointResolutionError(msg=error)
523 return None
526class TreeRule(BaseRule):
527 """A tree rule is non-terminal meaning it will never be returned to a provider.
528 Additionally this means it has no attributes that need to be resolved.
529 """
531 def __init__(self, rules, **kwargs):
532 super().__init__(**kwargs)
533 self.rules = [RuleCreator.create(**rule) for rule in rules]
535 def evaluate(self, scope_vars, rule_lib):
536 """If a tree rule's conditions are met, iterate its sub-rules
537 and return first result found.
539 :type scope_vars: dict
540 :type rule_lib: RuleSetStandardLibrary
541 :rtype: RuleSetEndpoint/EndpointResolutionError
542 """
543 if self.evaluate_conditions(scope_vars, rule_lib):
544 for rule in self.rules:
545 # don't share scope_vars between rules
546 rule_result = rule.evaluate(scope_vars.copy(), rule_lib)
547 if rule_result:
548 return rule_result
549 return None
552class RuleCreator:
553 endpoint = EndpointRule
554 error = ErrorRule
555 tree = TreeRule
557 @classmethod
558 def create(cls, **kwargs):
559 """Create a rule instance from metadata.
561 :rtype: TreeRule/EndpointRule/ErrorRule
562 """
563 rule_type = kwargs.pop("type")
564 try:
565 rule_class = getattr(cls, rule_type)
566 except AttributeError:
567 raise EndpointResolutionError(
568 msg=f"Unknown rule type: {rule_type}. A rule must "
569 "be of type tree, endpoint or error."
570 )
571 else:
572 return rule_class(**kwargs)
575class ParameterType(Enum):
576 """Translation from `type` attribute to native Python type."""
578 string = str
579 boolean = bool
582class ParameterDefinition:
583 """The spec of an individual parameter defined in a RuleSet."""
585 def __init__(
586 self,
587 name,
588 parameter_type,
589 documentation=None,
590 builtIn=None,
591 default=None,
592 required=None,
593 deprecated=None,
594 ):
595 self.name = name
596 try:
597 self.parameter_type = getattr(
598 ParameterType, parameter_type.lower()
599 ).value
600 except AttributeError:
601 raise EndpointResolutionError(
602 msg=f"Unknown parameter type: {parameter_type}. "
603 "A parameter must be of type string or boolean."
604 )
605 self.documentation = documentation
606 self.builtin = builtIn
607 self.default = default
608 self.required = required
609 self.deprecated = deprecated
611 def validate_input(self, value):
612 """Perform base validation on parameter input.
614 :type value: Any
615 :raises: EndpointParametersError
616 """
618 if not isinstance(value, self.parameter_type):
619 raise EndpointResolutionError(
620 msg=f"Value ({self.name}) is the wrong "
621 f"type. Must be {self.parameter_type}."
622 )
623 if self.deprecated is not None:
624 depr_str = f"{self.name} has been deprecated."
625 msg = self.deprecated.get("message")
626 since = self.deprecated.get("since")
627 if msg:
628 depr_str += f"\n{msg}"
629 if since:
630 depr_str += f"\nDeprecated since {since}."
631 logger.info(depr_str)
633 return None
635 def process_input(self, value):
636 """Process input against spec, applying default if value is None."""
637 if value is None:
638 if self.default is not None:
639 return self.default
640 if self.required:
641 raise EndpointResolutionError(
642 f"Cannot find value for required parameter {self.name}"
643 )
644 # in all other cases, the parameter will keep the value None
645 else:
646 self.validate_input(value)
647 return value
650class RuleSet:
651 """Collection of rules to derive a routable service endpoint."""
653 def __init__(
654 self, version, parameters, rules, partitions, documentation=None
655 ):
656 self.version = version
657 self.parameters = self._ingest_parameter_spec(parameters)
658 self.rules = [RuleCreator.create(**rule) for rule in rules]
659 self.rule_lib = RuleSetStandardLibrary(partitions)
660 self.documentation = documentation
662 def _ingest_parameter_spec(self, parameters):
663 return {
664 name: ParameterDefinition(
665 name,
666 spec["type"],
667 spec.get("documentation"),
668 spec.get("builtIn"),
669 spec.get("default"),
670 spec.get("required"),
671 spec.get("deprecated"),
672 )
673 for name, spec in parameters.items()
674 }
676 def process_input_parameters(self, input_params):
677 """Process each input parameter against its spec.
679 :type input_params: dict
680 """
681 for name, spec in self.parameters.items():
682 value = spec.process_input(input_params.get(name))
683 if value is not None:
684 input_params[name] = value
685 return None
687 def evaluate(self, input_parameters):
688 """Evaluate input parameters against rules returning first match.
690 :type input_parameters: dict
691 """
692 self.process_input_parameters(input_parameters)
693 for rule in self.rules:
694 evaluation = rule.evaluate(input_parameters.copy(), self.rule_lib)
695 if evaluation is not None:
696 return evaluation
697 return None
700class EndpointProvider:
701 """Derives endpoints from a RuleSet for given input parameters."""
703 def __init__(self, ruleset_data, partition_data):
704 self.ruleset = RuleSet(**ruleset_data, partitions=partition_data)
706 @lru_cache_weakref(maxsize=CACHE_SIZE)
707 def resolve_endpoint(self, **input_parameters):
708 """Match input parameters to a rule.
710 :type input_parameters: dict
711 :rtype: RuleSetEndpoint
712 """
713 params_for_error = input_parameters.copy()
714 endpoint = self.ruleset.evaluate(input_parameters)
715 if endpoint is None:
716 param_string = "\n".join(
717 [f"{key}: {value}" for key, value in params_for_error.items()]
718 )
719 raise EndpointResolutionError(
720 msg=f"No endpoint found for parameters:\n{param_string}"
721 )
722 return endpoint