Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/utils.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import copy
4from collections import UserString
5from datetime import datetime, timedelta
6from dateutil.tz import tzutc
7import json
8import itertools
9import ipaddress
10import logging
11import os
12import random
13import re
14import sys
15import threading
16import time
17from urllib import parse as urlparse
18from urllib.request import getproxies, proxy_bypass
20from dateutil.parser import ParserError, parse
22import jmespath
23from jmespath import functions
24from jmespath.parser import Parser, ParsedResult
26from c7n import config
27from c7n.exceptions import ClientError, PolicyValidationError
29# Try to play nice in a serverless environment, where we don't require yaml
31try:
32 import yaml
33except ImportError: # pragma: no cover
34 SafeLoader = BaseSafeDumper = yaml = None
35else:
36 try:
37 from yaml import CSafeLoader as SafeLoader, CSafeDumper as BaseSafeDumper
38 except ImportError: # pragma: no cover
39 from yaml import SafeLoader, SafeDumper as BaseSafeDumper
42class SafeDumper(BaseSafeDumper or object):
43 def ignore_aliases(self, data):
44 return True
47log = logging.getLogger('custodian.utils')
50class VarsSubstitutionError(Exception):
51 pass
54def load_file(path, format=None, vars=None):
55 if format is None:
56 format = 'yaml'
57 _, ext = os.path.splitext(path)
58 if ext[1:] == 'json':
59 format = 'json'
61 with open(path) as fh:
62 contents = fh.read()
64 if vars:
65 try:
66 contents = contents.format(**vars)
67 except IndexError:
68 msg = 'Failed to substitute variable by positional argument.'
69 raise VarsSubstitutionError(msg)
70 except KeyError as e:
71 msg = 'Failed to substitute variables. KeyError on {}'.format(str(e))
72 raise VarsSubstitutionError(msg)
74 if format == 'yaml':
75 return yaml_load(contents)
76 elif format == 'json':
77 return loads(contents)
80def yaml_load(value):
81 if yaml is None:
82 raise RuntimeError("Yaml not available")
83 return yaml.load(value, Loader=SafeLoader)
86def yaml_dump(value):
87 if yaml is None:
88 raise RuntimeError("Yaml not available")
89 return yaml.dump(value, default_flow_style=False, Dumper=SafeDumper)
92def loads(body):
93 return json.loads(body)
96def dumps(data, fh=None, indent=0):
97 if fh:
98 return json.dump(data, fh, cls=JsonEncoder, indent=indent)
99 else:
100 return json.dumps(data, cls=JsonEncoder, indent=indent)
103def format_event(evt):
104 return json.dumps(evt, indent=2)
107def filter_empty(d):
108 for k, v in list(d.items()):
109 if not v:
110 del d[k]
111 return d
114# We need a minimum floor when examining possible timestamp
115# values to distinguish from other numeric time usages. Use
116# the S3 Launch Date.
117DATE_FLOOR = time.mktime((2006, 3, 19, 0, 0, 0, 0, 0, 0))
120def parse_date(v, tz=None):
121 """Handle various permutations of a datetime serialization
122 to a datetime with the given timezone.
124 Handles strings, seconds since epoch, and milliseconds since epoch.
125 """
127 if v is None:
128 return v
130 tz = tz or tzutc()
132 if isinstance(v, datetime):
133 if v.tzinfo is None:
134 return v.astimezone(tz)
135 return v
137 if isinstance(v, str) and not v.isdigit():
138 try:
139 return parse(v).astimezone(tz)
140 except (AttributeError, TypeError, ValueError, OverflowError):
141 pass
143 # OSError on windows -- https://bugs.python.org/issue36439
144 exceptions = (ValueError, OSError) if os.name == "nt" else (ValueError)
146 if isinstance(v, (int, float, str)):
147 try:
148 if float(v) > DATE_FLOOR:
149 v = datetime.fromtimestamp(float(v)).astimezone(tz)
150 except exceptions:
151 pass
153 if isinstance(v, (int, float, str)):
154 # try interpreting as milliseconds epoch
155 try:
156 if float(v) > DATE_FLOOR:
157 v = datetime.fromtimestamp(float(v) / 1000).astimezone(tz)
158 except exceptions:
159 pass
161 return isinstance(v, datetime) and v or None
164def type_schema(
165 type_name, inherits=None, rinherit=None,
166 aliases=None, required=None, **props):
167 """jsonschema generation helper
169 params:
170 - type_name: name of the type
171 - inherits: list of document fragments that are required via anyOf[$ref]
172 - rinherit: use another schema as a base for this, basically work around
173 inherits issues with additionalProperties and type enums.
174 - aliases: additional names this type maybe called
175 - required: list of required properties, by default 'type' is required
176 - props: additional key value properties
177 """
178 if aliases:
179 type_names = [type_name]
180 type_names.extend(aliases)
181 else:
182 type_names = [type_name]
184 if rinherit:
185 s = copy.deepcopy(rinherit)
186 s['properties']['type'] = {'enum': type_names}
187 else:
188 s = {
189 'type': 'object',
190 'properties': {
191 'type': {'enum': type_names}}}
193 # Ref based inheritance and additional properties don't mix well.
194 # https://stackoverflow.com/questions/22689900/json-schema-allof-with-additionalproperties
195 if not inherits:
196 s['additionalProperties'] = False
198 s['properties'].update(props)
200 for k, v in props.items():
201 if v is None:
202 del s['properties'][k]
203 if not required:
204 required = []
205 if isinstance(required, list):
206 required.append('type')
207 s['required'] = required
208 if inherits:
209 extended = s
210 s = {'allOf': [{'$ref': i} for i in inherits]}
211 s['allOf'].append(extended)
212 return s
215class JsonEncoder(json.JSONEncoder):
217 def default(self, obj):
218 if isinstance(obj, datetime):
219 return obj.isoformat()
220 if isinstance(obj, FormatDate):
221 return obj.datetime.isoformat()
222 if isinstance(obj, bytes):
223 return obj.decode('utf8', errors="ignore")
224 return json.JSONEncoder.default(self, obj)
227def group_by(resources, key):
228 """Return a mapping of key value to resources with the corresponding value.
230 Key may be specified as dotted form for nested dictionary lookup
231 """
232 resource_map = {}
233 parts = key.split('.')
234 for r in resources:
235 v = r
236 for k in parts:
237 v = v.get(k)
238 if not isinstance(v, dict):
239 break
240 resource_map.setdefault(v, []).append(r)
241 return resource_map
244def chunks(iterable, size=50):
245 """Break an iterable into lists of size"""
246 batch = []
247 for n in iterable:
248 batch.append(n)
249 if len(batch) % size == 0:
250 yield batch
251 batch = []
252 if batch:
253 yield batch
256def camelResource(obj, implicitDate=False, implicitTitle=True):
257 """Some sources from apis return lowerCased where as describe calls
259 always return TitleCase, this function turns the former to the later
261 implicitDate ~ automatically sniff keys that look like isoformat date strings
262 and convert to python datetime objects.
263 """
264 if not isinstance(obj, dict):
265 return obj
266 for k in list(obj.keys()):
267 v = obj.pop(k)
268 if implicitTitle:
269 ok = "%s%s" % (k[0].upper(), k[1:])
270 else:
271 ok = k
272 obj[ok] = v
274 if implicitDate:
275 # config service handles datetime differently then describe sdks
276 # the sdks use knowledge of the shape to support language native
277 # date times, while config just turns everything into a serialized
278 # json with mangled keys without type info. to normalize to describe
279 # we implicitly sniff keys which look like datetimes, and have an
280 # isoformat marker ('T').
281 kn = k.lower()
282 if isinstance(v, (str, int)) and ('time' in kn or 'date' in kn):
283 try:
284 dv = parse_date(v)
285 except ParserError:
286 dv = None
287 if dv:
288 obj[ok] = dv
289 if isinstance(v, dict):
290 camelResource(v, implicitDate, implicitTitle)
291 elif isinstance(v, list):
292 for e in v:
293 camelResource(e, implicitDate, implicitTitle)
294 return obj
297def get_account_id_from_sts(session):
298 response = session.client('sts').get_caller_identity()
299 return response.get('Account')
302def get_account_alias_from_sts(session):
303 response = session.client('iam').list_account_aliases()
304 aliases = response.get('AccountAliases', ())
305 return aliases and aliases[0] or ''
308def query_instances(session, client=None, **query):
309 """Return a list of ec2 instances for the query.
310 """
311 if client is None:
312 client = session.client('ec2')
313 p = client.get_paginator('describe_instances')
314 results = p.paginate(**query)
315 return list(itertools.chain(
316 *[r["Instances"] for r in itertools.chain(
317 *[pp['Reservations'] for pp in results])]))
320CONN_CACHE = threading.local()
323def local_session(factory, region=None):
324 """Cache a session thread local for up to 45m"""
325 factory_region = getattr(factory, 'region', 'global')
326 if region:
327 factory_region = region
328 s = getattr(CONN_CACHE, factory_region, {}).get('session')
329 t = getattr(CONN_CACHE, factory_region, {}).get('time')
331 n = time.time()
332 if s is not None and t + (60 * 45) > n:
333 return s
334 s = factory()
336 setattr(CONN_CACHE, factory_region, {'session': s, 'time': n})
337 return s
340def reset_session_cache():
341 for k in [k for k in dir(CONN_CACHE) if not k.startswith('_')]:
342 setattr(CONN_CACHE, k, {})
344 from .credentials import CustodianSession
345 CustodianSession.close()
348def annotation(i, k):
349 return i.get(k, ())
352def set_annotation(i, k, v):
353 """
354 >>> x = {}
355 >>> set_annotation(x, 'marker', 'a')
356 >>> annotation(x, 'marker')
357 ['a']
358 """
359 if not isinstance(i, dict):
360 raise ValueError("Can only annotate dictionaries")
362 if not isinstance(v, list):
363 v = [v]
365 if k in i:
366 ev = i.get(k)
367 if isinstance(ev, list):
368 ev.extend(v)
369 else:
370 i[k] = v
373def parse_s3(s3_path):
374 if not s3_path.startswith('s3://'):
375 raise ValueError("invalid s3 path")
376 ridx = s3_path.find('/', 5)
377 if ridx == -1:
378 ridx = None
379 bucket = s3_path[5:ridx]
380 s3_path = s3_path.rstrip('/')
381 if ridx is None:
382 key_prefix = ""
383 else:
384 key_prefix = s3_path[s3_path.find('/', 5):]
385 return s3_path, bucket, key_prefix
388REGION_PARTITION_MAP = {
389 'us-gov-east-1': 'aws-us-gov',
390 'us-gov-west-1': 'aws-us-gov',
391 'cn-north-1': 'aws-cn',
392 'cn-northwest-1': 'aws-cn',
393 'us-isob-east-1': 'aws-iso-b',
394 'us-iso-east-1': 'aws-iso'
395}
398def get_partition(region):
399 return REGION_PARTITION_MAP.get(region, 'aws')
402def generate_arn(
403 service, resource, partition='aws',
404 region=None, account_id=None, resource_type=None, separator='/'):
405 """Generate an Amazon Resource Name.
406 See http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html.
407 """
408 if region and region in REGION_PARTITION_MAP:
409 partition = REGION_PARTITION_MAP[region]
410 if service == 's3':
411 region = ''
412 arn = 'arn:%s:%s:%s:%s:' % (
413 partition, service, region if region else '', account_id if account_id else '')
414 if resource_type:
415 if resource.startswith(separator):
416 separator = ''
417 arn = arn + '%s%s%s' % (resource_type, separator, resource)
418 else:
419 arn = arn + resource
420 return arn
423def snapshot_identifier(prefix, db_identifier):
424 """Return an identifier for a snapshot of a database or cluster.
425 """
426 now = datetime.now()
427 return '%s-%s-%s' % (prefix, db_identifier, now.strftime('%Y-%m-%d-%H-%M'))
430retry_log = logging.getLogger('c7n.retry')
433def get_retry(retry_codes=(), max_attempts=8, min_delay=1, log_retries=False):
434 """Decorator for retry boto3 api call on transient errors.
436 https://www.awsarchitectureblog.com/2015/03/backoff.html
437 https://en.wikipedia.org/wiki/Exponential_backoff
439 :param codes: A sequence of retryable error codes.
440 :param max_attempts: The max number of retries, by default the delay
441 time is proportional to the max number of attempts.
442 :param log_retries: Whether we should log retries, if specified
443 specifies the level at which the retry should be logged.
444 :param _max_delay: The maximum delay for any retry interval *note*
445 this parameter is only exposed for unit testing, as its
446 derived from the number of attempts.
448 Returns a function for invoking aws client calls that
449 retries on retryable error codes.
450 """
451 max_delay = max(min_delay, 2) ** max_attempts
453 def _retry(func, *args, ignore_err_codes=(), **kw):
454 for idx, delay in enumerate(
455 backoff_delays(min_delay, max_delay, jitter=True)):
456 try:
457 return func(*args, **kw)
458 except ClientError as e:
459 if e.response['Error']['Code'] in ignore_err_codes:
460 return
461 elif e.response['Error']['Code'] not in retry_codes:
462 raise
463 elif idx == max_attempts - 1:
464 raise
465 if log_retries:
466 retry_log.log(
467 log_retries,
468 "retrying %s on error:%s attempt:%d last delay:%0.2f",
469 func, e.response['Error']['Code'], idx, delay)
470 time.sleep(delay)
471 return _retry
474def backoff_delays(start, stop, factor=2.0, jitter=False):
475 """Geometric backoff sequence w/ jitter
476 """
477 cur = start
478 while cur <= stop:
479 if jitter:
480 yield cur - (cur * random.random() / 5)
481 else:
482 yield cur
483 cur = cur * factor
486def parse_cidr(value):
487 """Process cidr ranges."""
488 if isinstance(value, list) or isinstance(value, set):
489 return IPv4List([parse_cidr(item) for item in value])
490 klass = IPv4Network
491 if '/' not in value:
492 klass = ipaddress.ip_address
493 try:
494 v = klass(str(value))
495 except (ipaddress.AddressValueError, ValueError):
496 v = None
497 return v
500class IPv4Network(ipaddress.IPv4Network):
502 # Override for net 2 net containment comparison
503 def __contains__(self, other):
504 if other is None:
505 return False
506 if isinstance(other, ipaddress._BaseNetwork):
507 return self.supernet_of(other)
508 return super(IPv4Network, self).__contains__(other)
510 if (sys.version_info.major == 3 and sys.version_info.minor <= 6): # pragma: no cover
511 @staticmethod
512 def _is_subnet_of(a, b):
513 try:
514 # Always false if one is v4 and the other is v6.
515 if a._version != b._version:
516 raise TypeError(f"{a} and {b} are not of the same version")
517 return (b.network_address <= a.network_address and
518 b.broadcast_address >= a.broadcast_address)
519 except AttributeError:
520 raise TypeError(f"Unable to test subnet containment "
521 f"between {a} and {b}")
523 def supernet_of(self, other):
524 """Return True if this network is a supernet of other."""
525 return self._is_subnet_of(other, self)
528class IPv4List:
529 def __init__(self, ipv4_list):
530 self.ipv4_list = ipv4_list
532 def __contains__(self, other):
533 if other is None:
534 return False
535 in_networks = any([other in y_elem for y_elem in self.ipv4_list
536 if isinstance(y_elem, IPv4Network)])
537 in_addresses = any([other == y_elem for y_elem in self.ipv4_list
538 if isinstance(y_elem, ipaddress.IPv4Address)])
539 return any([in_networks, in_addresses])
542def reformat_schema(model):
543 """ Reformat schema to be in a more displayable format. """
544 if not hasattr(model, 'schema'):
545 return "Model '{}' does not have a schema".format(model)
547 if 'properties' not in model.schema:
548 return "Schema in unexpected format."
550 ret = copy.deepcopy(model.schema['properties'])
552 if 'type' in ret:
553 del ret['type']
555 for key in model.schema.get('required', []):
556 if key in ret:
557 ret[key]['required'] = True
559 return ret
562# from botocore.utils avoiding runtime dependency for botocore for other providers.
563# license apache 2.0
564def set_value_from_jmespath(source, expression, value, is_first=True):
565 # This takes a (limited) jmespath-like expression & can set a value based
566 # on it.
567 # Limitations:
568 # * Only handles dotted lookups
569 # * No offsets/wildcards/slices/etc.
570 bits = expression.split('.', 1)
571 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
573 if not current_key:
574 raise ValueError(expression)
576 if remainder:
577 if current_key not in source:
578 # We've got something in the expression that's not present in the
579 # source (new key). If there's any more bits, we'll set the key
580 # with an empty dictionary.
581 source[current_key] = {}
583 return set_value_from_jmespath(
584 source[current_key],
585 remainder,
586 value,
587 is_first=False
588 )
590 # If we're down to a single key, set it.
591 source[current_key] = value
594def format_string_values(obj, err_fallback=(IndexError, KeyError), formatter=None, *args, **kwargs):
595 """
596 Format all string values in an object.
597 Return the updated object
598 """
599 if isinstance(obj, dict):
600 new = {}
601 for key in obj.keys():
602 new[key] = format_string_values(obj[key], formatter=formatter, *args, **kwargs)
603 return new
604 elif isinstance(obj, list):
605 new = []
606 for item in obj:
607 new.append(format_string_values(item, formatter=formatter, *args, **kwargs))
608 return new
609 elif isinstance(obj, str):
610 try:
611 if formatter:
612 return formatter(obj, *args, **kwargs)
613 else:
614 return obj.format(*args, **kwargs)
615 except err_fallback:
616 return obj
617 else:
618 return obj
621def parse_url_config(url):
622 if url and '://' not in url:
623 url += "://"
624 conf = config.Bag()
625 parsed = urlparse.urlparse(url)
626 for k in ('scheme', 'netloc', 'path'):
627 conf[k] = getattr(parsed, k)
628 for k, v in urlparse.parse_qs(parsed.query).items():
629 conf[k] = v[0]
630 conf['url'] = url
631 return conf
634def join_output_path(output_path, *parts):
635 # allow users to specify interpolated output paths
636 if '{' in output_path:
637 return output_path
639 if "://" not in output_path:
640 return os.path.join(output_path, *parts)
642 # handle urls with query strings
643 parsed = urlparse.urlparse(output_path)
644 updated_path = "/".join((parsed.path, *parts))
645 parts = list(parsed)
646 parts[2] = updated_path
647 return urlparse.urlunparse(parts)
650def get_policy_provider(policy_data):
651 if isinstance(policy_data['resource'], list):
652 provider_name, _ = policy_data['resource'][0].split('.', 1)
653 elif '.' in policy_data['resource']:
654 provider_name, _ = policy_data['resource'].split('.', 1)
655 else:
656 provider_name = 'aws'
657 return provider_name
660def get_proxy_url(url):
661 proxies = getproxies()
662 parsed = urlparse.urlparse(url)
664 proxy_keys = [
665 parsed.scheme + '://' + parsed.netloc,
666 parsed.scheme,
667 'all://' + parsed.netloc,
668 'all'
669 ]
671 # Set port if not defined explicitly in url.
672 port = parsed.port
673 if port is None and parsed.scheme == 'http':
674 port = 80
675 elif port is None and parsed.scheme == 'https':
676 port = 443
678 hostname = parsed.hostname is not None and parsed.hostname or ''
680 # Determine if proxy should be used based on no_proxy entries.
681 # Note this does not support no_proxy ip or cidr entries.
682 if proxy_bypass("%s:%s" % (hostname, port)):
683 return None
685 for key in proxy_keys:
686 if key in proxies:
687 return proxies[key]
689 return None
692class DeferredFormatString(UserString):
693 """A string that returns itself when formatted
695 Let any format spec pass through. This lets us selectively defer
696 expansion of runtime variables without losing format spec details.
697 """
698 def __format__(self, format_spec):
699 return "".join(("{", self.data, f":{format_spec}" if format_spec else "", "}"))
702class FormatDate:
703 """a datetime wrapper with extended pyformat syntax"""
705 date_increment = re.compile(r'\+[0-9]+[Mdh]')
707 def __init__(self, d=None):
708 self._d = d
710 def __str__(self):
711 return str(self._d)
713 @property
714 def datetime(self):
715 return self._d
717 @classmethod
718 def utcnow(cls):
719 return cls(datetime.utcnow())
721 def __getattr__(self, k):
722 return getattr(self._d, k)
724 def __format__(self, fmt=None):
725 d = self._d
726 increments = self.date_increment.findall(fmt)
727 for i in increments:
728 p = {}
729 if i[-1] == 'M':
730 p['minutes'] = float(i[1:-1])
731 if i[-1] == 'h':
732 p['hours'] = float(i[1:-1])
733 if i[-1] == 'd':
734 p['days'] = float(i[1:-1])
735 d = d + timedelta(**p)
736 if increments:
737 fmt = self.date_increment.sub("", fmt)
738 return d.__format__(fmt)
741class QueryParser:
743 QuerySchema = {}
744 type_name = ''
745 # Allow multiple values to be passed to a query param
746 multi_value = True
747 # If using multi_value, specify scalar fields here
748 single_value_fields = ()
750 @classmethod
751 def is_implicit_query_filter(cls, data):
752 key = list(data[0].keys())[0]
753 if (key not in cls.QuerySchema and 'Filters' in cls.QuerySchema and
754 (key in cls.QuerySchema['Filters'] or key.startswith('tag:'))):
755 return True
756 return False
758 @classmethod
759 def implicit_qfilter_translate(cls, data):
760 filters = []
761 for d in data:
762 key = list(d.keys())[0]
763 values = list(d.values())[0]
764 if not isinstance(values, list):
765 values = [values]
766 filters.append({'Name': key, 'Values': values})
767 return [{'Filters': filters}]
769 @classmethod
770 def parse(cls, data):
771 if not isinstance(data, (tuple, list)):
772 raise PolicyValidationError(
773 f"{cls.type_name} Query Invalid Format, must be array of dicts"
774 )
776 # Backwards compatibility
777 if data:
778 if not isinstance(data[0], dict):
779 raise PolicyValidationError(
780 f"{cls.type_name} Query Invalid Format, must be array of dicts"
781 )
782 # Check for query filter key value pairs not listed under 'Filters' key
783 if cls.is_implicit_query_filter(data):
784 data = cls.implicit_qfilter_translate(data)
786 # Support iam-policy and elasticache 'Name', 'Value' queries without 'Filters' key
787 if (data[0].get('Value') and
788 (cls.type_name == 'IAM Policy' or cls.type_name == 'ElastiCache')):
789 try:
790 data = [{d['Name']: d['Value']} for d in data]
791 except KeyError:
792 raise PolicyValidationError(
793 f"{cls.type_name} Query Invalid Format. "
794 f"Query: {data} is not a list of key-value pairs "
795 f"from {cls.QuerySchema}"
796 )
798 # Support ebs-snapshot and volume 'Name', 'Values' queries without 'Filters' key
799 elif (data[0].get('Values') and
800 (cls.type_name == 'EBS Snapshot' or
801 cls.type_name == 'EBS Volume')):
802 data = [{"Filters": data}]
804 results = []
805 names = set()
806 for d in data:
807 if not isinstance(d, dict):
808 raise PolicyValidationError(
809 f"Query Invalid Format. Must be a list of key-value pairs "
810 f"from {cls.QuerySchema}"
811 )
812 if not len(list(d.keys())) == 1:
813 raise PolicyValidationError(
814 f"Query Invalid Format. Must be a list of key-value pairs "
815 f"from {cls.QuerySchema}"
816 )
818 if d.get("Filters"):
819 results.append({"Filters": cls.parse_qfilters(d["Filters"])})
820 else:
821 key, value = cls.parse_query(d)
823 # Allow for multiple queries with the same key
824 if key in names and (not cls.multi_value or key in cls.single_value_fields):
825 raise PolicyValidationError(
826 f"{cls.type_name} Query Invalid Key: {key} Must be unique")
827 elif key in names:
828 for q in results:
829 if list(q.keys())[0] == key:
830 q[key].append(d[key])
831 else:
832 names.add(key)
833 results.append({key: value})
835 return results
837 @classmethod
838 def parse_qfilters(cls, data):
839 if not isinstance(data, (tuple, list)):
840 raise PolicyValidationError(
841 f"{cls.type_name} Query Filter Invalid Format, must be array of dicts"
842 )
844 results = []
845 names = set()
846 for f in data:
847 if not isinstance(f, dict):
848 raise PolicyValidationError(
849 f"{cls.type_name} Query Filter Invalid Format, must be array of dicts"
850 )
851 if "Name" not in f or "Values" not in f:
852 raise PolicyValidationError(
853 f"{cls.type_name} Query Filter Invalid: Each filter must "
854 "contain 'Name' and 'Values' keys."
855 )
857 key = f['Name']
858 values = f['Values']
860 if key not in cls.QuerySchema.get("Filters", {}) and not key.startswith('tag:'):
861 raise PolicyValidationError(
862 f"{cls.type_name} Query Filter Invalid Key: {key} "
863 f"Valid: {', '.join(cls.QuerySchema.keys())}"
864 )
866 if not isinstance(values, list):
867 raise PolicyValidationError(
868 f"{cls.type_name} Query Filter Invalid Value {f} for key {key}, must be array.")
870 vtype = cls.QuerySchema["Filters"].get(key)
871 if vtype is None and key.startswith('tag'):
872 vtype = str
874 for v in values:
875 cls.type_check(vtype, v)
877 # Allow for multiple queries with the same key
878 if key in names:
879 for qf in results:
880 if qf['Name'] == key:
881 qf['Values'].extend(values)
882 else:
883 names.add(key)
884 results.append({'Name': key, 'Values': values})
886 return results
888 @classmethod
889 def parse_query(cls, data):
890 key = list(data.keys())[0]
891 values = list(data.values())[0]
893 if (not cls.multi_value or key in cls.single_value_fields) and isinstance(values, list):
894 raise PolicyValidationError(
895 f"{cls.type_name} Query Invalid Value {values}: Value for {key} must be scalar"
896 )
897 elif (cls.multi_value and key not in cls.single_value_fields
898 and not isinstance(values, list)):
899 values = [values]
901 if key not in cls.QuerySchema:
902 raise PolicyValidationError(
903 f"{cls.type_name} Query Invalid Key: {key} "
904 f"Valid: {', '.join(cls.QuerySchema.keys())}"
905 )
907 vtype = cls.QuerySchema.get(key)
908 if isinstance(values, list):
909 for v in values:
910 cls.type_check(vtype, v)
911 else:
912 cls.type_check(vtype, values)
914 return key, values
916 @classmethod
917 def type_check(cls, vtype, value):
918 if isinstance(vtype, tuple):
919 if value not in vtype:
920 raise PolicyValidationError(
921 f"{cls.type_name} Query Invalid Value: {value} Valid: {', '.join(vtype)}")
922 elif vtype == 'date':
923 if not parse_date(value):
924 raise PolicyValidationError(
925 f"{cls.type_name} Query Invalid Date Value: {value}")
926 elif not isinstance(value, vtype):
927 raise PolicyValidationError(
928 f"{cls.type_name} Query Invalid Value Type {value}"
929 )
932def get_annotation_prefix(s):
933 return 'c7n:{}'.format(s)
936def merge_dict_list(dict_iter):
937 """take an list of dictionaries and merge them.
939 last dict wins/overwrites on keys.
940 """
941 result = {}
942 for d in dict_iter:
943 result.update(d)
944 return result
947def merge_dict(a, b):
948 """Perform a merge of dictionaries A and B
950 Any subdictionaries will be recursively merged.
951 Any leaf elements in the form of scalar will use the value from B.
952 If A is a str and B is a list, A will be inserted into the front of the list.
953 If A is a list and B is a str, B will be appended to the list.
954 If there are two lists for the same key, the lists will be merged
955 deduplicated with values in A first, followed by any additional values from B.
956 """
957 d = copy.deepcopy(a)
958 for k, v in b.items():
959 if k not in d:
960 d[k] = v
961 elif isinstance(d[k], dict) and isinstance(v, dict):
962 d[k] = merge_dict(d[k], v)
963 elif isinstance(d[k], list) and isinstance(v, list):
964 for val in v:
965 if val not in d[k]:
966 d[k].append(val)
967 elif isinstance(v, str) and isinstance(d[k], list):
968 if v in d[k]:
969 continue
970 else:
971 d[k].append(v)
972 elif isinstance(v, list) and isinstance(d[k], str):
973 if d[k] in v:
974 d[k] = v
975 else:
976 d[k] = [d[k]]
977 d[k].extend(v)
978 elif k in d and isinstance(v, (int, str, float, bool)):
979 d[k] = v
980 else:
981 raise Exception(f"k={k}, {type(v)} and {type(d[k])} not conformable.")
982 return d
985def compare_dicts_using_sets(a, b) -> bool:
986 """Compares two dicts and replaces any lists or strings with sets
988 Compares any lists in the dict as sets.
989 """
991 if a.keys() != b.keys():
992 return False
994 for k, v in b.items():
995 if isinstance(v, list):
996 v = format_to_set(v)
997 if isinstance(a[k], str):
998 a[k] = format_to_set(a[k])
999 if isinstance(a[k], list):
1000 a[k] = format_to_set(a[k])
1001 if isinstance(v, str):
1002 v = format_to_set(v)
1003 if isinstance(a[k], dict) and isinstance(v, dict):
1004 if compare_dicts_using_sets(a[k], v):
1005 continue
1006 if v != a[k]:
1007 return False
1008 return True
1011def format_to_set(x) -> set:
1012 """Formats lists and strings to sets.
1014 Strings return as a set with one string.
1015 Lists return as a set.
1016 Variables of other datatypes will return as the original datatype.
1017 """
1018 if isinstance(x, str):
1019 return set([x])
1020 if isinstance(x, list):
1021 return set(x)
1022 else:
1023 return x
1026def format_dict_with_sets(x: dict) -> dict:
1027 """Formats string and list values in a dict to sets.
1029 Any string value returns as a set with one string.
1030 Any list values return as a set.
1031 Returns a formatted dict.
1032 """
1033 if isinstance(x, dict):
1034 format_dict = {}
1035 for key, value in x.items():
1036 if isinstance(value, dict):
1037 format_dict[key] = format_dict_with_sets(value)
1038 else:
1039 format_dict[key] = format_to_set(value)
1040 return format_dict
1041 else:
1042 return x
1045def select_keys(d, keys):
1046 result = {}
1047 for k in keys:
1048 result[k] = d.get(k)
1049 return result
1052def get_human_size(size, precision=2):
1053 # interesting discussion on 1024 vs 1000 as base
1054 # https://en.wikipedia.org/wiki/Binary_prefix
1055 suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
1056 suffixIndex = 0
1057 while size > 1024:
1058 suffixIndex += 1
1059 size = size / 1024.0
1061 return "%.*f %s" % (precision, size, suffixes[suffixIndex])
1064def get_support_region(manager):
1065 # support is a unique service in that it doesnt support regional endpoints
1066 # thus, we need to construct the client based off the regions found here:
1067 # https://docs.aws.amazon.com/general/latest/gr/awssupport.html
1068 #
1069 # aws-cn uses cn-north-1 for both the Beijing and Ningxia regions
1070 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Beijing.html
1071 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Ningxia.html
1073 partition = get_partition(manager.config.region)
1074 support_region = None
1075 if partition == "aws":
1076 support_region = "us-east-1"
1077 elif partition == "aws-us-gov":
1078 support_region = "us-gov-west-1"
1079 elif partition == "aws-cn":
1080 support_region = "cn-north-1"
1081 return support_region
1084def get_resource_tagging_region(resource_type, region):
1085 # For global resources, tags don't populate in the get_resources call
1086 # unless the call is being made to us-east-1. For govcloud this is us-gov-west-1.
1088 partition = get_partition(region)
1089 if partition == "aws":
1090 return getattr(resource_type, 'global_resource', None) and 'us-east-1' or region
1091 elif partition == "aws-us-gov":
1092 return getattr(resource_type, 'global_resource', None) and 'us-gov-west-1' or region
1093 return region
1096def get_eni_resource_type(eni):
1097 if eni.get('Attachment'):
1098 instance_id = eni['Attachment'].get('InstanceId')
1099 else:
1100 instance_id = None
1101 description = eni.get('Description')
1102 # EC2
1103 if instance_id:
1104 rtype = 'ec2'
1105 # ELB/ELBv2
1106 elif description.startswith('ELB app/'):
1107 rtype = 'elb-app'
1108 elif description.startswith('ELB net/'):
1109 rtype = 'elb-net'
1110 elif description.startswith('ELB gwy/'):
1111 rtype = 'elb-gwy'
1112 elif description.startswith('ELB'):
1113 rtype = 'elb'
1114 # Other Resources
1115 elif description == 'ENI managed by APIGateway':
1116 rtype = 'apigw'
1117 elif description.startswith('AWS CodeStar Connections'):
1118 rtype = 'codestar'
1119 elif description.startswith('DAX'):
1120 rtype = 'dax'
1121 elif description.startswith('AWS created network interface for directory'):
1122 rtype = 'dir'
1123 elif description == 'DMSNetworkInterface':
1124 rtype = 'dms'
1125 elif description.startswith('arn:aws:ecs:'):
1126 rtype = 'ecs'
1127 elif description.startswith('EFS mount target for'):
1128 rtype = 'fsmt'
1129 elif description.startswith('ElastiCache'):
1130 rtype = 'elasticache'
1131 elif description.startswith('AWS ElasticMapReduce'):
1132 rtype = 'emr'
1133 elif description.startswith('CloudHSM Managed Interface'):
1134 rtype = 'hsm'
1135 elif description.startswith('CloudHsm ENI'):
1136 rtype = 'hsmv2'
1137 elif description.startswith('AWS Lambda VPC ENI'):
1138 rtype = 'lambda'
1139 elif description.startswith('AWS Lambda VPC'):
1140 rtype = 'lambda'
1141 elif description.startswith('Interface for NAT Gateway'):
1142 rtype = 'nat'
1143 elif (description == 'RDSNetworkInterface' or
1144 description.startswith('Network interface for DBProxy')):
1145 rtype = 'rds'
1146 elif description == 'RedshiftNetworkInterface':
1147 rtype = 'redshift'
1148 elif description.startswith('Network Interface for Transit Gateway Attachment'):
1149 rtype = 'tgw'
1150 elif description.startswith('VPC Endpoint Interface'):
1151 rtype = 'vpce'
1152 elif description.startswith('aws-k8s-branch-eni'):
1153 rtype = 'eks'
1154 else:
1155 rtype = 'unknown'
1156 return rtype
1159class C7NJmespathFunctions(functions.Functions):
1160 @functions.signature(
1161 {'types': ['string']}, {'types': ['string']}
1162 )
1163 def _func_split(self, sep, string):
1164 return string.split(sep)
1166 @functions.signature(
1167 {'types': ['string']}
1168 )
1169 def _func_from_json(self, string):
1170 try:
1171 return json.loads(string)
1172 except json.JSONDecodeError:
1173 return None
1176class C7NJMESPathParser(Parser):
1177 def parse(self, expression):
1178 result = super().parse(expression)
1179 return ParsedResultWithOptions(
1180 expression=result.expression,
1181 parsed=result.parsed
1182 )
1185class ParsedResultWithOptions(ParsedResult):
1186 def search(self, value, options=None):
1187 # if options are explicitly passed in, we honor those
1188 if not options:
1189 options = jmespath.Options(custom_functions=C7NJmespathFunctions())
1190 return super().search(value, options)
1193def jmespath_search(*args, **kwargs):
1194 return jmespath.search(
1195 *args,
1196 **kwargs,
1197 options=jmespath.Options(custom_functions=C7NJmespathFunctions())
1198 )
1201def get_path(path: str, resource: dict):
1202 """
1203 This function provides a wrapper to obtain a value from a resource
1204 in an efficient manner.
1205 jmespath_search is expensive and it's rarely the case that
1206 there is a path in the id field, therefore this wrapper is an optimisation.
1208 :param path: the path or field name to fetch
1209 :param resource: the resource instance description
1210 :return: the field/path value
1211 """
1212 if '.' in path:
1213 return jmespath_search(path, resource)
1214 return resource[path]
1217def jmespath_compile(expression):
1218 parsed = C7NJMESPathParser().parse(expression)
1219 return parsed