Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/utils.py: 27%
569 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import copy
4from collections import UserString
5from datetime import datetime, timedelta
6from dateutil.tz import tzutc
7import json
8import itertools
9import ipaddress
10import logging
11import os
12import random
13import re
14import sys
15import threading
16import time
17from urllib import parse as urlparse
18from urllib.request import getproxies, proxy_bypass
20from dateutil.parser import ParserError, parse
22import jmespath
23from jmespath import functions
24from jmespath.parser import Parser, ParsedResult
26from c7n import config
27from c7n.exceptions import ClientError, PolicyValidationError
29# Try to play nice in a serverless environment, where we don't require yaml
31try:
32 import yaml
33except ImportError: # pragma: no cover
34 SafeLoader = BaseSafeDumper = yaml = None
35else:
36 try:
37 from yaml import CSafeLoader as SafeLoader, CSafeDumper as BaseSafeDumper
38 except ImportError: # pragma: no cover
39 from yaml import SafeLoader, SafeDumper as BaseSafeDumper
42class SafeDumper(BaseSafeDumper or object):
43 def ignore_aliases(self, data):
44 return True
47log = logging.getLogger('custodian.utils')
50class VarsSubstitutionError(Exception):
51 pass
54def load_file(path, format=None, vars=None):
55 if format is None:
56 format = 'yaml'
57 _, ext = os.path.splitext(path)
58 if ext[1:] == 'json':
59 format = 'json'
61 with open(path) as fh:
62 contents = fh.read()
64 if vars:
65 try:
66 contents = contents.format(**vars)
67 except IndexError:
68 msg = 'Failed to substitute variable by positional argument.'
69 raise VarsSubstitutionError(msg)
70 except KeyError as e:
71 msg = 'Failed to substitute variables. KeyError on {}'.format(str(e))
72 raise VarsSubstitutionError(msg)
74 if format == 'yaml':
75 return yaml_load(contents)
76 elif format == 'json':
77 return loads(contents)
80def yaml_load(value):
81 if yaml is None:
82 raise RuntimeError("Yaml not available")
83 return yaml.load(value, Loader=SafeLoader)
86def yaml_dump(value):
87 if yaml is None:
88 raise RuntimeError("Yaml not available")
89 return yaml.dump(value, default_flow_style=False, Dumper=SafeDumper)
92def loads(body):
93 return json.loads(body)
96def dumps(data, fh=None, indent=0):
97 if fh:
98 return json.dump(data, fh, cls=JsonEncoder, indent=indent)
99 else:
100 return json.dumps(data, cls=JsonEncoder, indent=indent)
103def format_event(evt):
104 return json.dumps(evt, indent=2)
107def filter_empty(d):
108 for k, v in list(d.items()):
109 if not v:
110 del d[k]
111 return d
114# We need a minimum floor when examining possible timestamp
115# values to distinguish from other numeric time usages. Use
116# the S3 Launch Date.
117DATE_FLOOR = time.mktime((2006, 3, 19, 0, 0, 0, 0, 0, 0))
120def parse_date(v, tz=None):
121 """Handle various permutations of a datetime serialization
122 to a datetime with the given timezone.
124 Handles strings, seconds since epoch, and milliseconds since epoch.
125 """
127 if v is None:
128 return v
130 tz = tz or tzutc()
132 if isinstance(v, datetime):
133 if v.tzinfo is None:
134 return v.astimezone(tz)
135 return v
137 if isinstance(v, str) and not v.isdigit():
138 try:
139 return parse(v).astimezone(tz)
140 except (AttributeError, TypeError, ValueError, OverflowError):
141 pass
143 # OSError on windows -- https://bugs.python.org/issue36439
144 exceptions = (ValueError, OSError) if os.name == "nt" else (ValueError)
146 if isinstance(v, (int, float, str)):
147 try:
148 if float(v) > DATE_FLOOR:
149 v = datetime.fromtimestamp(float(v)).astimezone(tz)
150 except exceptions:
151 pass
153 if isinstance(v, (int, float, str)):
154 # try interpreting as milliseconds epoch
155 try:
156 if float(v) > DATE_FLOOR:
157 v = datetime.fromtimestamp(float(v) / 1000).astimezone(tz)
158 except exceptions:
159 pass
161 return isinstance(v, datetime) and v or None
164def type_schema(
165 type_name, inherits=None, rinherit=None,
166 aliases=None, required=None, **props):
167 """jsonschema generation helper
169 params:
170 - type_name: name of the type
171 - inherits: list of document fragments that are required via anyOf[$ref]
172 - rinherit: use another schema as a base for this, basically work around
173 inherits issues with additionalProperties and type enums.
174 - aliases: additional names this type maybe called
175 - required: list of required properties, by default 'type' is required
176 - props: additional key value properties
177 """
178 if aliases:
179 type_names = [type_name]
180 type_names.extend(aliases)
181 else:
182 type_names = [type_name]
184 if rinherit:
185 s = copy.deepcopy(rinherit)
186 s['properties']['type'] = {'enum': type_names}
187 else:
188 s = {
189 'type': 'object',
190 'properties': {
191 'type': {'enum': type_names}}}
193 # Ref based inheritance and additional properties don't mix well.
194 # https://stackoverflow.com/questions/22689900/json-schema-allof-with-additionalproperties
195 if not inherits:
196 s['additionalProperties'] = False
198 s['properties'].update(props)
200 for k, v in props.items():
201 if v is None:
202 del s['properties'][k]
203 if not required:
204 required = []
205 if isinstance(required, list):
206 required.append('type')
207 s['required'] = required
208 if inherits:
209 extended = s
210 s = {'allOf': [{'$ref': i} for i in inherits]}
211 s['allOf'].append(extended)
212 return s
215class JsonEncoder(json.JSONEncoder):
217 def default(self, obj):
218 if isinstance(obj, datetime):
219 return obj.isoformat()
220 if isinstance(obj, FormatDate):
221 return obj.datetime.isoformat()
222 if isinstance(obj, bytes):
223 return obj.decode('utf8', errors="ignore")
224 return json.JSONEncoder.default(self, obj)
227def group_by(resources, key):
228 """Return a mapping of key value to resources with the corresponding value.
230 Key may be specified as dotted form for nested dictionary lookup
231 """
232 resource_map = {}
233 parts = key.split('.')
234 for r in resources:
235 v = r
236 for k in parts:
237 v = v.get(k)
238 if not isinstance(v, dict):
239 break
240 resource_map.setdefault(v, []).append(r)
241 return resource_map
244def chunks(iterable, size=50):
245 """Break an iterable into lists of size"""
246 batch = []
247 for n in iterable:
248 batch.append(n)
249 if len(batch) % size == 0:
250 yield batch
251 batch = []
252 if batch:
253 yield batch
256def camelResource(obj, implicitDate=False, implicitTitle=True):
257 """Some sources from apis return lowerCased where as describe calls
259 always return TitleCase, this function turns the former to the later
261 implicitDate ~ automatically sniff keys that look like isoformat date strings
262 and convert to python datetime objects.
263 """
264 if not isinstance(obj, dict):
265 return obj
266 for k in list(obj.keys()):
267 v = obj.pop(k)
268 if implicitTitle:
269 ok = "%s%s" % (k[0].upper(), k[1:])
270 else:
271 ok = k
272 obj[ok] = v
274 if implicitDate:
275 # config service handles datetime differently then describe sdks
276 # the sdks use knowledge of the shape to support language native
277 # date times, while config just turns everything into a serialized
278 # json with mangled keys without type info. to normalize to describe
279 # we implicitly sniff keys which look like datetimes, and have an
280 # isoformat marker ('T').
281 kn = k.lower()
282 if isinstance(v, (str, int)) and ('time' in kn or 'date' in kn):
283 try:
284 dv = parse_date(v)
285 except ParserError:
286 dv = None
287 if dv:
288 obj[ok] = dv
289 if isinstance(v, dict):
290 camelResource(v, implicitDate, implicitTitle)
291 elif isinstance(v, list):
292 for e in v:
293 camelResource(e, implicitDate, implicitTitle)
294 return obj
297def get_account_id_from_sts(session):
298 response = session.client('sts').get_caller_identity()
299 return response.get('Account')
302def get_account_alias_from_sts(session):
303 response = session.client('iam').list_account_aliases()
304 aliases = response.get('AccountAliases', ())
305 return aliases and aliases[0] or ''
308def query_instances(session, client=None, **query):
309 """Return a list of ec2 instances for the query.
310 """
311 if client is None:
312 client = session.client('ec2')
313 p = client.get_paginator('describe_instances')
314 results = p.paginate(**query)
315 return list(itertools.chain(
316 *[r["Instances"] for r in itertools.chain(
317 *[pp['Reservations'] for pp in results])]))
320CONN_CACHE = threading.local()
323def local_session(factory, region=None):
324 """Cache a session thread local for up to 45m"""
325 factory_region = getattr(factory, 'region', 'global')
326 if region:
327 factory_region = region
328 s = getattr(CONN_CACHE, factory_region, {}).get('session')
329 t = getattr(CONN_CACHE, factory_region, {}).get('time')
331 n = time.time()
332 if s is not None and t + (60 * 45) > n:
333 return s
334 s = factory()
336 setattr(CONN_CACHE, factory_region, {'session': s, 'time': n})
337 return s
340def reset_session_cache():
341 for k in [k for k in dir(CONN_CACHE) if not k.startswith('_')]:
342 setattr(CONN_CACHE, k, {})
345def annotation(i, k):
346 return i.get(k, ())
349def set_annotation(i, k, v):
350 """
351 >>> x = {}
352 >>> set_annotation(x, 'marker', 'a')
353 >>> annotation(x, 'marker')
354 ['a']
355 """
356 if not isinstance(i, dict):
357 raise ValueError("Can only annotate dictionaries")
359 if not isinstance(v, list):
360 v = [v]
362 if k in i:
363 ev = i.get(k)
364 if isinstance(ev, list):
365 ev.extend(v)
366 else:
367 i[k] = v
370def parse_s3(s3_path):
371 if not s3_path.startswith('s3://'):
372 raise ValueError("invalid s3 path")
373 ridx = s3_path.find('/', 5)
374 if ridx == -1:
375 ridx = None
376 bucket = s3_path[5:ridx]
377 s3_path = s3_path.rstrip('/')
378 if ridx is None:
379 key_prefix = ""
380 else:
381 key_prefix = s3_path[s3_path.find('/', 5):]
382 return s3_path, bucket, key_prefix
385REGION_PARTITION_MAP = {
386 'us-gov-east-1': 'aws-us-gov',
387 'us-gov-west-1': 'aws-us-gov',
388 'cn-north-1': 'aws-cn',
389 'cn-northwest-1': 'aws-cn',
390 'us-isob-east-1': 'aws-iso-b',
391 'us-iso-east-1': 'aws-iso'
392}
395def get_partition(region):
396 return REGION_PARTITION_MAP.get(region, 'aws')
399def generate_arn(
400 service, resource, partition='aws',
401 region=None, account_id=None, resource_type=None, separator='/'):
402 """Generate an Amazon Resource Name.
403 See http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html.
404 """
405 if region and region in REGION_PARTITION_MAP:
406 partition = REGION_PARTITION_MAP[region]
407 if service == 's3':
408 region = ''
409 arn = 'arn:%s:%s:%s:%s:' % (
410 partition, service, region if region else '', account_id if account_id else '')
411 if resource_type:
412 if resource.startswith(separator):
413 separator = ''
414 arn = arn + '%s%s%s' % (resource_type, separator, resource)
415 else:
416 arn = arn + resource
417 return arn
420def snapshot_identifier(prefix, db_identifier):
421 """Return an identifier for a snapshot of a database or cluster.
422 """
423 now = datetime.now()
424 return '%s-%s-%s' % (prefix, db_identifier, now.strftime('%Y-%m-%d-%H-%M'))
427retry_log = logging.getLogger('c7n.retry')
430def get_retry(retry_codes=(), max_attempts=8, min_delay=1, log_retries=False):
431 """Decorator for retry boto3 api call on transient errors.
433 https://www.awsarchitectureblog.com/2015/03/backoff.html
434 https://en.wikipedia.org/wiki/Exponential_backoff
436 :param codes: A sequence of retryable error codes.
437 :param max_attempts: The max number of retries, by default the delay
438 time is proportional to the max number of attempts.
439 :param log_retries: Whether we should log retries, if specified
440 specifies the level at which the retry should be logged.
441 :param _max_delay: The maximum delay for any retry interval *note*
442 this parameter is only exposed for unit testing, as its
443 derived from the number of attempts.
445 Returns a function for invoking aws client calls that
446 retries on retryable error codes.
447 """
448 max_delay = max(min_delay, 2) ** max_attempts
450 def _retry(func, *args, ignore_err_codes=(), **kw):
451 for idx, delay in enumerate(
452 backoff_delays(min_delay, max_delay, jitter=True)):
453 try:
454 return func(*args, **kw)
455 except ClientError as e:
456 if e.response['Error']['Code'] in ignore_err_codes:
457 return
458 elif e.response['Error']['Code'] not in retry_codes:
459 raise
460 elif idx == max_attempts - 1:
461 raise
462 if log_retries:
463 retry_log.log(
464 log_retries,
465 "retrying %s on error:%s attempt:%d last delay:%0.2f",
466 func, e.response['Error']['Code'], idx, delay)
467 time.sleep(delay)
468 return _retry
471def backoff_delays(start, stop, factor=2.0, jitter=False):
472 """Geometric backoff sequence w/ jitter
473 """
474 cur = start
475 while cur <= stop:
476 if jitter:
477 yield cur - (cur * random.random() / 5)
478 else:
479 yield cur
480 cur = cur * factor
483def parse_cidr(value):
484 """Process cidr ranges."""
485 if isinstance(value, list) or isinstance(value, set):
486 return IPv4List([parse_cidr(item) for item in value])
487 klass = IPv4Network
488 if '/' not in value:
489 klass = ipaddress.ip_address
490 try:
491 v = klass(str(value))
492 except (ipaddress.AddressValueError, ValueError):
493 v = None
494 return v
497class IPv4Network(ipaddress.IPv4Network):
499 # Override for net 2 net containment comparison
500 def __contains__(self, other):
501 if other is None:
502 return False
503 if isinstance(other, ipaddress._BaseNetwork):
504 return self.supernet_of(other)
505 return super(IPv4Network, self).__contains__(other)
507 if (sys.version_info.major == 3 and sys.version_info.minor <= 6): # pragma: no cover
508 @staticmethod
509 def _is_subnet_of(a, b):
510 try:
511 # Always false if one is v4 and the other is v6.
512 if a._version != b._version:
513 raise TypeError(f"{a} and {b} are not of the same version")
514 return (b.network_address <= a.network_address and
515 b.broadcast_address >= a.broadcast_address)
516 except AttributeError:
517 raise TypeError(f"Unable to test subnet containment "
518 f"between {a} and {b}")
520 def supernet_of(self, other):
521 """Return True if this network is a supernet of other."""
522 return self._is_subnet_of(other, self)
525class IPv4List:
526 def __init__(self, ipv4_list):
527 self.ipv4_list = ipv4_list
529 def __contains__(self, other):
530 if other is None:
531 return False
532 in_networks = any([other in y_elem for y_elem in self.ipv4_list
533 if isinstance(y_elem, IPv4Network)])
534 in_addresses = any([other == y_elem for y_elem in self.ipv4_list
535 if isinstance(y_elem, ipaddress.IPv4Address)])
536 return any([in_networks, in_addresses])
539def reformat_schema(model):
540 """ Reformat schema to be in a more displayable format. """
541 if not hasattr(model, 'schema'):
542 return "Model '{}' does not have a schema".format(model)
544 if 'properties' not in model.schema:
545 return "Schema in unexpected format."
547 ret = copy.deepcopy(model.schema['properties'])
549 if 'type' in ret:
550 del ret['type']
552 for key in model.schema.get('required', []):
553 if key in ret:
554 ret[key]['required'] = True
556 return ret
559# from botocore.utils avoiding runtime dependency for botocore for other providers.
560# license apache 2.0
561def set_value_from_jmespath(source, expression, value, is_first=True):
562 # This takes a (limited) jmespath-like expression & can set a value based
563 # on it.
564 # Limitations:
565 # * Only handles dotted lookups
566 # * No offsets/wildcards/slices/etc.
567 bits = expression.split('.', 1)
568 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
570 if not current_key:
571 raise ValueError(expression)
573 if remainder:
574 if current_key not in source:
575 # We've got something in the expression that's not present in the
576 # source (new key). If there's any more bits, we'll set the key
577 # with an empty dictionary.
578 source[current_key] = {}
580 return set_value_from_jmespath(
581 source[current_key],
582 remainder,
583 value,
584 is_first=False
585 )
587 # If we're down to a single key, set it.
588 source[current_key] = value
591def format_string_values(obj, err_fallback=(IndexError, KeyError), formatter=None, *args, **kwargs):
592 """
593 Format all string values in an object.
594 Return the updated object
595 """
596 if isinstance(obj, dict):
597 new = {}
598 for key in obj.keys():
599 new[key] = format_string_values(obj[key], formatter=formatter, *args, **kwargs)
600 return new
601 elif isinstance(obj, list):
602 new = []
603 for item in obj:
604 new.append(format_string_values(item, formatter=formatter, *args, **kwargs))
605 return new
606 elif isinstance(obj, str):
607 try:
608 if formatter:
609 return formatter(obj, *args, **kwargs)
610 else:
611 return obj.format(*args, **kwargs)
612 except err_fallback:
613 return obj
614 else:
615 return obj
618def parse_url_config(url):
619 if url and '://' not in url:
620 url += "://"
621 conf = config.Bag()
622 parsed = urlparse.urlparse(url)
623 for k in ('scheme', 'netloc', 'path'):
624 conf[k] = getattr(parsed, k)
625 for k, v in urlparse.parse_qs(parsed.query).items():
626 conf[k] = v[0]
627 conf['url'] = url
628 return conf
631def join_output_path(output_path, *parts):
632 # allow users to specify interpolated output paths
633 if '{' in output_path:
634 return output_path
636 if "://" not in output_path:
637 return os.path.join(output_path, *parts)
639 # handle urls with query strings
640 parsed = urlparse.urlparse(output_path)
641 updated_path = "/".join((parsed.path, *parts))
642 parts = list(parsed)
643 parts[2] = updated_path
644 return urlparse.urlunparse(parts)
647def get_policy_provider(policy_data):
648 if isinstance(policy_data['resource'], list):
649 provider_name, _ = policy_data['resource'][0].split('.', 1)
650 elif '.' in policy_data['resource']:
651 provider_name, resource_type = policy_data['resource'].split('.', 1)
652 else:
653 provider_name = 'aws'
654 return provider_name
657def get_proxy_url(url):
658 proxies = getproxies()
659 parsed = urlparse.urlparse(url)
661 proxy_keys = [
662 parsed.scheme + '://' + parsed.netloc,
663 parsed.scheme,
664 'all://' + parsed.netloc,
665 'all'
666 ]
668 # Set port if not defined explicitly in url.
669 port = parsed.port
670 if port is None and parsed.scheme == 'http':
671 port = 80
672 elif port is None and parsed.scheme == 'https':
673 port = 443
675 hostname = parsed.hostname is not None and parsed.hostname or ''
677 # Determine if proxy should be used based on no_proxy entries.
678 # Note this does not support no_proxy ip or cidr entries.
679 if proxy_bypass("%s:%s" % (hostname, port)):
680 return None
682 for key in proxy_keys:
683 if key in proxies:
684 return proxies[key]
686 return None
689class DeferredFormatString(UserString):
690 """A string that returns itself when formatted
692 Let any format spec pass through. This lets us selectively defer
693 expansion of runtime variables without losing format spec details.
694 """
695 def __format__(self, format_spec):
696 return "".join(("{", self.data, f":{format_spec}" if format_spec else "", "}"))
699class FormatDate:
700 """a datetime wrapper with extended pyformat syntax"""
702 date_increment = re.compile(r'\+[0-9]+[Mdh]')
704 def __init__(self, d=None):
705 self._d = d
707 def __str__(self):
708 return str(self._d)
710 @property
711 def datetime(self):
712 return self._d
714 @classmethod
715 def utcnow(cls):
716 return cls(datetime.utcnow())
718 def __getattr__(self, k):
719 return getattr(self._d, k)
721 def __format__(self, fmt=None):
722 d = self._d
723 increments = self.date_increment.findall(fmt)
724 for i in increments:
725 p = {}
726 if i[-1] == 'M':
727 p['minutes'] = float(i[1:-1])
728 if i[-1] == 'h':
729 p['hours'] = float(i[1:-1])
730 if i[-1] == 'd':
731 p['days'] = float(i[1:-1])
732 d = d + timedelta(**p)
733 if increments:
734 fmt = self.date_increment.sub("", fmt)
735 return d.__format__(fmt)
738class QueryParser:
740 QuerySchema = {}
741 type_name = ''
742 multi_value = True
743 value_key = 'Values'
745 @classmethod
746 def parse(cls, data):
747 filters = []
748 if not isinstance(data, (tuple, list)):
749 raise PolicyValidationError(
750 "%s Query invalid format, must be array of dicts %s" % (
751 cls.type_name,
752 data))
753 for d in data:
754 if not isinstance(d, dict):
755 raise PolicyValidationError(
756 "%s Query Filter Invalid %s" % (cls.type_name, data))
757 if "Name" not in d or cls.value_key not in d:
758 raise PolicyValidationError(
759 "%s Query Filter Invalid: Missing Key or Values in %s" % (
760 cls.type_name, data))
762 key = d['Name']
763 values = d[cls.value_key]
765 if not cls.multi_value and isinstance(values, list):
766 raise PolicyValidationError(
767 "%s Query Filter Invalid Key: Value:%s Must be single valued" % (
768 cls.type_name, key))
769 elif not cls.multi_value:
770 values = [values]
772 if key not in cls.QuerySchema and not key.startswith('tag:'):
773 raise PolicyValidationError(
774 "%s Query Filter Invalid Key:%s Valid: %s" % (
775 cls.type_name, key, ", ".join(cls.QuerySchema.keys())))
777 vtype = cls.QuerySchema.get(key)
778 if vtype is None and key.startswith('tag'):
779 vtype = str
781 if not isinstance(values, list):
782 raise PolicyValidationError(
783 "%s Query Filter Invalid Values, must be array %s" % (
784 cls.type_name, data,))
786 for v in values:
787 if isinstance(vtype, tuple):
788 if v not in vtype:
789 raise PolicyValidationError(
790 "%s Query Filter Invalid Value: %s Valid: %s" % (
791 cls.type_name, v, ", ".join(vtype)))
792 elif not isinstance(v, vtype):
793 raise PolicyValidationError(
794 "%s Query Filter Invalid Value Type %s" % (
795 cls.type_name, data,))
797 filters.append(d)
799 return filters
802def get_annotation_prefix(s):
803 return 'c7n:{}'.format(s)
806def merge_dict_list(dict_iter):
807 """take an list of dictionaries and merge them.
809 last dict wins/overwrites on keys.
810 """
811 result = {}
812 for d in dict_iter:
813 result.update(d)
814 return result
817def merge_dict(a, b):
818 """Perform a merge of dictionaries a and b
820 Any subdictionaries will be recursively merged.
821 Any leaf elements in the form of a list or scalar will use the value from a
822 """
823 d = {}
824 for k, v in a.items():
825 if k not in b:
826 d[k] = v
827 elif isinstance(v, dict) and isinstance(b[k], dict):
828 d[k] = merge_dict(v, b[k])
829 for k, v in b.items():
830 if k not in d:
831 d[k] = v
832 return d
835def select_keys(d, keys):
836 result = {}
837 for k in keys:
838 result[k] = d.get(k)
839 return result
842def get_human_size(size, precision=2):
843 # interesting discussion on 1024 vs 1000 as base
844 # https://en.wikipedia.org/wiki/Binary_prefix
845 suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
846 suffixIndex = 0
847 while size > 1024:
848 suffixIndex += 1
849 size = size / 1024.0
851 return "%.*f %s" % (precision, size, suffixes[suffixIndex])
854def get_support_region(manager):
855 # support is a unique service in that it doesnt support regional endpoints
856 # thus, we need to construct the client based off the regions found here:
857 # https://docs.aws.amazon.com/general/latest/gr/awssupport.html
858 #
859 # aws-cn uses cn-north-1 for both the Beijing and Ningxia regions
860 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Beijing.html
861 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Ningxia.html
863 partition = get_partition(manager.config.region)
864 support_region = None
865 if partition == "aws":
866 support_region = "us-east-1"
867 elif partition == "aws-us-gov":
868 support_region = "us-gov-west-1"
869 elif partition == "aws-cn":
870 support_region = "cn-north-1"
871 return support_region
874def get_eni_resource_type(eni):
875 if eni.get('Attachment'):
876 instance_id = eni['Attachment'].get('InstanceId')
877 else:
878 instance_id = None
879 description = eni.get('Description')
880 # EC2
881 if instance_id:
882 rtype = 'ec2'
883 # ELB/ELBv2
884 elif description.startswith('ELB app/'):
885 rtype = 'elb-app'
886 elif description.startswith('ELB net/'):
887 rtype = 'elb-net'
888 elif description.startswith('ELB gwy/'):
889 rtype = 'elb-gwy'
890 elif description.startswith('ELB'):
891 rtype = 'elb'
892 # Other Resources
893 elif description == 'ENI managed by APIGateway':
894 rtype = 'apigw'
895 elif description.startswith('AWS CodeStar Connections'):
896 rtype = 'codestar'
897 elif description.startswith('DAX'):
898 rtype = 'dax'
899 elif description.startswith('AWS created network interface for directory'):
900 rtype = 'dir'
901 elif description == 'DMSNetworkInterface':
902 rtype = 'dms'
903 elif description.startswith('arn:aws:ecs:'):
904 rtype = 'ecs'
905 elif description.startswith('EFS mount target for'):
906 rtype = 'fsmt'
907 elif description.startswith('ElastiCache'):
908 rtype = 'elasticache'
909 elif description.startswith('AWS ElasticMapReduce'):
910 rtype = 'emr'
911 elif description.startswith('CloudHSM Managed Interface'):
912 rtype = 'hsm'
913 elif description.startswith('CloudHsm ENI'):
914 rtype = 'hsmv2'
915 elif description.startswith('AWS Lambda VPC'):
916 rtype = 'lambda'
917 elif description.startswith('AWS Lambda VPC ENI'):
918 rtype = 'lambda'
919 elif description.startswith('Interface for NAT Gateway'):
920 rtype = 'nat'
921 elif (description == 'RDSNetworkInterface' or
922 description.startswith('Network interface for DBProxy')):
923 rtype = 'rds'
924 elif description == 'RedshiftNetworkInterface':
925 rtype = 'redshift'
926 elif description.startswith('Network Interface for Transit Gateway Attachment'):
927 rtype = 'tgw'
928 elif description.startswith('VPC Endpoint Interface'):
929 rtype = 'vpce'
930 elif description.startswith('aws-k8s-branch-eni'):
931 rtype = 'eks'
932 else:
933 rtype = 'unknown'
934 return rtype
937class C7NJmespathFunctions(functions.Functions):
938 @functions.signature(
939 {'types': ['string']}, {'types': ['string']}
940 )
941 def _func_split(self, sep, string):
942 return string.split(sep)
945class C7NJMESPathParser(Parser):
946 def parse(self, expression):
947 result = super().parse(expression)
948 return ParsedResultWithOptions(
949 expression=result.expression,
950 parsed=result.parsed
951 )
954class ParsedResultWithOptions(ParsedResult):
955 def search(self, value, options=None):
956 # if options are explicitly passed in, we honor those
957 if not options:
958 options = jmespath.Options(custom_functions=C7NJmespathFunctions())
959 return super().search(value, options)
962def jmespath_search(*args, **kwargs):
963 return jmespath.search(
964 *args,
965 **kwargs,
966 options=jmespath.Options(custom_functions=C7NJmespathFunctions())
967 )
970def jmespath_compile(expression):
971 parsed = C7NJMESPathParser().parse(expression)
972 return parsed