Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/utils.py: 8%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3import copy
4from collections import UserString
5from datetime import datetime, timedelta
6from dateutil.tz import tzutc
7import json
8import itertools
9import ipaddress
10import logging
11import os
12import random
13import re
14import sys
15import threading
16import time
17from urllib import parse as urlparse
18from urllib.request import getproxies, proxy_bypass
20from dateutil.parser import ParserError, parse
22import importlib.resources
23import jmespath
24from jmespath import functions
25from jmespath.parser import Parser, ParsedResult
27from c7n import config
28from c7n.exceptions import ClientError, PolicyValidationError
30# Try to play nice in a serverless environment, where we don't require yaml
32try:
33 import yaml
34except ImportError: # pragma: no cover
35 SafeLoader = BaseSafeDumper = yaml = None
36else:
37 try:
38 from yaml import CSafeLoader as SafeLoader, CSafeDumper as BaseSafeDumper
39 except ImportError: # pragma: no cover
40 from yaml import SafeLoader, SafeDumper as BaseSafeDumper
43class SafeDumper(BaseSafeDumper or object):
44 def ignore_aliases(self, data):
45 return True
48log = logging.getLogger('custodian.utils')
51class VarsSubstitutionError(Exception):
52 pass
55def load_file(path, format=None, vars=None):
56 if format is None:
57 format = 'yaml'
58 _, ext = os.path.splitext(path)
59 if ext[1:] == 'json':
60 format = 'json'
62 with open(path) as fh:
63 contents = fh.read()
65 if vars:
66 try:
67 contents = contents.format(**vars)
68 except IndexError:
69 msg = 'Failed to substitute variable by positional argument.'
70 raise VarsSubstitutionError(msg)
71 except KeyError as e:
72 msg = 'Failed to substitute variables. KeyError on {}'.format(str(e))
73 raise VarsSubstitutionError(msg)
75 if format == 'yaml':
76 return yaml_load(contents)
77 elif format == 'json':
78 return loads(contents)
81def yaml_load(value):
82 if yaml is None:
83 raise RuntimeError("Yaml not available")
84 return yaml.load(value, Loader=SafeLoader)
87def yaml_dump(value):
88 if yaml is None:
89 raise RuntimeError("Yaml not available")
90 return yaml.dump(value, default_flow_style=False, Dumper=SafeDumper)
93def loads(body):
94 return json.loads(body)
97def dumps(data, fh=None, indent=0):
98 if fh:
99 return json.dump(data, fh, cls=JsonEncoder, indent=indent)
100 else:
101 return json.dumps(data, cls=JsonEncoder, indent=indent)
104def format_event(evt):
105 return json.dumps(evt, indent=2)
108def filter_empty(d):
109 for k, v in list(d.items()):
110 if not v:
111 del d[k]
112 return d
115# We need a minimum floor when examining possible timestamp
116# values to distinguish from other numeric time usages. Use
117# the S3 Launch Date.
118DATE_FLOOR = time.mktime((2006, 3, 19, 0, 0, 0, 0, 0, 0))
121def parse_date(v, tz=None):
122 """Handle various permutations of a datetime serialization
123 to a datetime with the given timezone.
125 Handles strings, seconds since epoch, and milliseconds since epoch.
126 """
128 if v is None:
129 return v
131 tz = tz or tzutc()
133 if isinstance(v, datetime):
134 if v.tzinfo is None:
135 return v.astimezone(tz)
136 return v
138 if isinstance(v, str) and not v.isdigit():
139 try:
140 return parse(v).astimezone(tz)
141 except (AttributeError, TypeError, ValueError, OverflowError):
142 pass
144 # OSError on windows -- https://bugs.python.org/issue36439
145 exceptions = (ValueError, OSError) if os.name == "nt" else (ValueError)
147 if isinstance(v, (int, float, str)):
148 try:
149 if float(v) > DATE_FLOOR:
150 v = datetime.fromtimestamp(float(v)).astimezone(tz)
151 except exceptions:
152 pass
154 if isinstance(v, (int, float, str)):
155 # try interpreting as milliseconds epoch
156 try:
157 if float(v) > DATE_FLOOR:
158 v = datetime.fromtimestamp(float(v) / 1000).astimezone(tz)
159 except exceptions:
160 pass
162 return isinstance(v, datetime) and v or None
165def type_schema(
166 type_name, inherits=None, rinherit=None,
167 aliases=None, required=None, **props):
168 """jsonschema generation helper
170 params:
171 - type_name: name of the type
172 - inherits: list of document fragments that are required via anyOf[$ref]
173 - rinherit: use another schema as a base for this, basically work around
174 inherits issues with additionalProperties and type enums.
175 - aliases: additional names this type maybe called
176 - required: list of required properties, by default 'type' is required
177 - props: additional key value properties
178 """
179 if aliases:
180 type_names = [type_name]
181 type_names.extend(aliases)
182 else:
183 type_names = [type_name]
185 if rinherit:
186 s = copy.deepcopy(rinherit)
187 s['properties']['type'] = {'enum': type_names}
188 else:
189 s = {
190 'type': 'object',
191 'properties': {
192 'type': {'enum': type_names}}}
194 # Ref based inheritance and additional properties don't mix well.
195 # https://stackoverflow.com/questions/22689900/json-schema-allof-with-additionalproperties
196 if not inherits:
197 s['additionalProperties'] = False
199 s['properties'].update(props)
201 for k, v in props.items():
202 if v is None:
203 del s['properties'][k]
204 if not required:
205 required = []
206 if isinstance(required, list):
207 required.append('type')
208 s['required'] = required
209 if inherits:
210 extended = s
211 s = {'allOf': [{'$ref': i} for i in inherits]}
212 s['allOf'].append(extended)
213 return s
216class JsonEncoder(json.JSONEncoder):
218 def default(self, obj):
219 if isinstance(obj, datetime):
220 return obj.isoformat()
221 if isinstance(obj, FormatDate):
222 return obj.datetime.isoformat()
223 if isinstance(obj, bytes):
224 return obj.decode('utf8', errors="ignore")
225 return json.JSONEncoder.default(self, obj)
228def group_by(resources, key):
229 """Return a mapping of key value to resources with the corresponding value.
231 Key may be specified as dotted form for nested dictionary lookup
232 """
233 resource_map = {}
234 parts = key.split('.')
235 for r in resources:
236 v = r
237 for k in parts:
238 v = v.get(k)
239 if not isinstance(v, dict):
240 break
241 resource_map.setdefault(v, []).append(r)
242 return resource_map
245def chunks(iterable, size=50):
246 """Break an iterable into lists of size"""
247 batch = []
248 for n in iterable:
249 batch.append(n)
250 if len(batch) % size == 0:
251 yield batch
252 batch = []
253 if batch:
254 yield batch
257def camelResource(obj, implicitDate=False, implicitTitle=True):
258 """Some sources from apis return lowerCased where as describe calls
260 always return TitleCase, this function turns the former to the later
262 implicitDate ~ automatically sniff keys that look like isoformat date strings
263 and convert to python datetime objects.
264 """
265 if not isinstance(obj, dict):
266 return obj
267 for k in list(obj.keys()):
268 v = obj.pop(k)
269 if implicitTitle:
270 ok = "%s%s" % (k[0].upper(), k[1:])
271 else:
272 ok = k
273 obj[ok] = v
275 if implicitDate:
276 # config service handles datetime differently then describe sdks
277 # the sdks use knowledge of the shape to support language native
278 # date times, while config just turns everything into a serialized
279 # json with mangled keys without type info. to normalize to describe
280 # we implicitly sniff keys which look like datetimes, and have an
281 # isoformat marker ('T').
282 kn = k.lower()
283 if isinstance(v, (str, int)) and ('time' in kn or 'date' in kn):
284 try:
285 dv = parse_date(v)
286 except ParserError:
287 dv = None
288 if dv:
289 obj[ok] = dv
290 if isinstance(v, dict):
291 camelResource(v, implicitDate, implicitTitle)
292 elif isinstance(v, list):
293 for e in v:
294 camelResource(e, implicitDate, implicitTitle)
295 return obj
298def get_account_id_from_sts(session):
299 response = session.client('sts').get_caller_identity()
300 return response.get('Account')
303def get_account_alias_from_sts(session):
304 response = session.client('iam').list_account_aliases()
305 aliases = response.get('AccountAliases', ())
306 return aliases and aliases[0] or ''
309def query_instances(session, client=None, **query):
310 """Return a list of ec2 instances for the query.
311 """
312 if client is None:
313 client = session.client('ec2')
314 p = client.get_paginator('describe_instances')
315 results = p.paginate(**query)
316 return list(itertools.chain(
317 *[r["Instances"] for r in itertools.chain(
318 *[pp['Reservations'] for pp in results])]))
321CONN_CACHE = threading.local()
324def local_session(factory, region=None):
325 """Cache a session thread local for up to 45m"""
326 factory_region = getattr(factory, 'region', 'global')
327 if region:
328 factory_region = region
329 s = getattr(CONN_CACHE, factory_region, {}).get('session')
330 t = getattr(CONN_CACHE, factory_region, {}).get('time')
332 n = time.time()
333 if s is not None and t + (60 * 45) > n:
334 return s
335 s = factory()
337 setattr(CONN_CACHE, factory_region, {'session': s, 'time': n})
338 return s
341def reset_session_cache():
342 for k in [k for k in dir(CONN_CACHE) if not k.startswith('_')]:
343 setattr(CONN_CACHE, k, {})
345 from .credentials import CustodianSession
346 CustodianSession.close()
349def annotation(i, k):
350 return i.get(k, ())
353def set_annotation(i, k, v):
354 """
355 >>> x = {}
356 >>> set_annotation(x, 'marker', 'a')
357 >>> annotation(x, 'marker')
358 ['a']
359 """
360 if not isinstance(i, dict):
361 raise ValueError("Can only annotate dictionaries")
363 if not isinstance(v, list):
364 v = [v]
366 if k in i:
367 ev = i.get(k)
368 if isinstance(ev, list):
369 ev.extend(v)
370 else:
371 i[k] = v
374def parse_s3(s3_path):
375 if not s3_path.startswith('s3://'):
376 raise ValueError("invalid s3 path")
377 ridx = s3_path.find('/', 5)
378 if ridx == -1:
379 ridx = None
380 bucket = s3_path[5:ridx]
381 s3_path = s3_path.rstrip('/')
382 if ridx is None:
383 key_prefix = ""
384 else:
385 key_prefix = s3_path[s3_path.find('/', 5):]
386 return s3_path, bucket, key_prefix
389REGION_PARTITION_MAP = json.loads(
390 (importlib.resources.files('c7n') / 'data/aws_region_partition_map.json').read_text()
391)
394def get_partition(region):
395 if not region:
396 return ''
397 return REGION_PARTITION_MAP.get(region, 'aws')
400def generate_arn(
401 service, resource, partition='aws',
402 region=None, account_id=None, resource_type=None, separator='/'):
403 """Generate an Amazon Resource Name.
404 See http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html.
405 """
406 if region:
407 partition = get_partition(region)
408 if service == 's3':
409 region = ''
410 arn = 'arn:%s:%s:%s:%s:' % (
411 partition, service, region if region else '', account_id if account_id else '')
412 if resource_type:
413 if resource.startswith(separator):
414 separator = ''
415 arn = arn + '%s%s%s' % (resource_type, separator, resource)
416 else:
417 arn = arn + resource
418 return arn
421def snapshot_identifier(prefix, db_identifier):
422 """Return an identifier for a snapshot of a database or cluster.
423 """
424 now = datetime.now()
425 return '%s-%s-%s' % (prefix, db_identifier, now.strftime('%Y-%m-%d-%H-%M'))
428retry_log = logging.getLogger('c7n.retry')
431def get_retry(retry_codes=(), max_attempts=8, min_delay=1, log_retries=False):
432 """Decorator for retry boto3 api call on transient errors.
434 https://www.awsarchitectureblog.com/2015/03/backoff.html
435 https://en.wikipedia.org/wiki/Exponential_backoff
437 :param codes: A sequence of retryable error codes.
438 :param max_attempts: The max number of retries, by default the delay
439 time is proportional to the max number of attempts.
440 :param log_retries: Whether we should log retries, if specified
441 specifies the level at which the retry should be logged.
442 :param _max_delay: The maximum delay for any retry interval *note*
443 this parameter is only exposed for unit testing, as its
444 derived from the number of attempts.
446 Returns a function for invoking aws client calls that
447 retries on retryable error codes.
448 """
449 max_delay = max(min_delay, 2) ** max_attempts
451 def _retry(func, *args, ignore_err_codes=(), **kw):
452 for idx, delay in enumerate(
453 backoff_delays(min_delay, max_delay, jitter=True)):
454 try:
455 return func(*args, **kw)
456 except ClientError as e:
457 if e.response['Error']['Code'] in ignore_err_codes:
458 return
459 elif e.response['Error']['Code'] not in retry_codes:
460 raise
461 elif idx == max_attempts - 1:
462 raise
463 if log_retries:
464 retry_log.log(
465 log_retries,
466 "retrying %s on error:%s attempt:%d last delay:%0.2f",
467 func, e.response['Error']['Code'], idx, delay)
468 time.sleep(delay)
469 return _retry
472def backoff_delays(start, stop, factor=2.0, jitter=False):
473 """Geometric backoff sequence w/ jitter
474 """
475 cur = start
476 while cur <= stop:
477 if jitter:
478 yield cur - (cur * random.random() / 5)
479 else:
480 yield cur
481 cur = cur * factor
484def parse_cidr(value):
485 """Process cidr ranges."""
486 if isinstance(value, list) or isinstance(value, set):
487 return IPv4List([parse_cidr(item) for item in value])
488 klass = IPv4Network
489 if '/' not in value:
490 klass = ipaddress.ip_address
491 try:
492 v = klass(str(value))
493 except (ipaddress.AddressValueError, ValueError):
494 v = None
495 return v
498class IPv4Network(ipaddress.IPv4Network):
500 # Override for net 2 net containment comparison
501 def __contains__(self, other):
502 if other is None:
503 return False
504 if isinstance(other, ipaddress._BaseNetwork):
505 return self.supernet_of(other)
506 return super(IPv4Network, self).__contains__(other)
508 if (sys.version_info.major == 3 and sys.version_info.minor <= 6): # pragma: no cover
509 @staticmethod
510 def _is_subnet_of(a, b):
511 try:
512 # Always false if one is v4 and the other is v6.
513 if a._version != b._version:
514 raise TypeError(f"{a} and {b} are not of the same version")
515 return (b.network_address <= a.network_address and
516 b.broadcast_address >= a.broadcast_address)
517 except AttributeError:
518 raise TypeError(f"Unable to test subnet containment "
519 f"between {a} and {b}")
521 def supernet_of(self, other):
522 """Return True if this network is a supernet of other."""
523 return self._is_subnet_of(other, self)
526class IPv4List:
527 def __init__(self, ipv4_list):
528 self.ipv4_list = ipv4_list
530 def __contains__(self, other):
531 if other is None:
532 return False
533 in_networks = any([other in y_elem for y_elem in self.ipv4_list
534 if isinstance(y_elem, IPv4Network)])
535 in_addresses = any([other == y_elem for y_elem in self.ipv4_list
536 if isinstance(y_elem, ipaddress.IPv4Address)])
537 return any([in_networks, in_addresses])
540def reformat_schema(model):
541 """ Reformat schema to be in a more displayable format. """
542 if not hasattr(model, 'schema'):
543 return "Model '{}' does not have a schema".format(model)
545 if 'properties' not in model.schema:
546 return "Schema in unexpected format."
548 ret = copy.deepcopy(model.schema['properties'])
550 if 'type' in ret:
551 del ret['type']
553 for key in model.schema.get('required', []):
554 if key in ret:
555 ret[key]['required'] = True
557 return ret
560# from botocore.utils avoiding runtime dependency for botocore for other providers.
561# license apache 2.0
562def set_value_from_jmespath(source, expression, value, is_first=True):
563 # This takes a (limited) jmespath-like expression & can set a value based
564 # on it.
565 # Limitations:
566 # * Only handles dotted lookups
567 # * No offsets/wildcards/slices/etc.
568 bits = expression.split('.', 1)
569 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else ''
571 if not current_key:
572 raise ValueError(expression)
574 if remainder:
575 if current_key not in source:
576 # We've got something in the expression that's not present in the
577 # source (new key). If there's any more bits, we'll set the key
578 # with an empty dictionary.
579 source[current_key] = {}
581 return set_value_from_jmespath(
582 source[current_key],
583 remainder,
584 value,
585 is_first=False
586 )
588 # If we're down to a single key, set it.
589 source[current_key] = value
592def format_string_values(obj, err_fallback=(IndexError, KeyError), formatter=None, *args, **kwargs):
593 """
594 Format all string values in an object.
595 Return the updated object
596 """
597 if isinstance(obj, dict):
598 new = {}
599 for key in obj.keys():
600 new[key] = format_string_values(obj[key], formatter=formatter, *args, **kwargs)
601 return new
602 elif isinstance(obj, list):
603 new = []
604 for item in obj:
605 new.append(format_string_values(item, formatter=formatter, *args, **kwargs))
606 return new
607 elif isinstance(obj, str):
608 try:
609 if formatter:
610 return formatter(obj, *args, **kwargs)
611 else:
612 return obj.format(*args, **kwargs)
613 except err_fallback:
614 return obj
615 else:
616 return obj
619def parse_url_config(url):
620 if url and '://' not in url:
621 url += "://"
622 conf = config.Bag()
623 parsed = urlparse.urlparse(url)
624 for k in ('scheme', 'netloc', 'path'):
625 conf[k] = getattr(parsed, k)
626 for k, v in urlparse.parse_qs(parsed.query).items():
627 conf[k] = v[0]
628 conf['url'] = url
629 return conf
632def join_output_path(output_path, *parts):
633 # allow users to specify interpolated output paths
634 if '{' in output_path:
635 return output_path
637 if "://" not in output_path:
638 return os.path.join(output_path, *parts)
640 # handle urls with query strings
641 parsed = urlparse.urlparse(output_path)
642 updated_path = "/".join((parsed.path, *parts))
643 parts = list(parsed)
644 parts[2] = updated_path
645 return urlparse.urlunparse(parts)
648def get_policy_provider(policy_data):
649 if isinstance(policy_data['resource'], list):
650 provider_name, _ = policy_data['resource'][0].split('.', 1)
651 elif '.' in policy_data['resource']:
652 provider_name, _ = policy_data['resource'].split('.', 1)
653 else:
654 provider_name = 'aws'
655 return provider_name
658def get_proxy_url(url):
659 proxies = getproxies()
660 parsed = urlparse.urlparse(url)
662 proxy_keys = [
663 parsed.scheme + '://' + parsed.netloc,
664 parsed.scheme,
665 'all://' + parsed.netloc,
666 'all'
667 ]
669 # Set port if not defined explicitly in url.
670 port = parsed.port
671 if port is None and parsed.scheme == 'http':
672 port = 80
673 elif port is None and parsed.scheme == 'https':
674 port = 443
676 hostname = parsed.hostname is not None and parsed.hostname or ''
678 # Determine if proxy should be used based on no_proxy entries.
679 # Note this does not support no_proxy ip or cidr entries.
680 if proxy_bypass("%s:%s" % (hostname, port)):
681 return None
683 for key in proxy_keys:
684 if key in proxies:
685 return proxies[key]
687 return None
690class DeferredFormatString(UserString):
691 """A string that returns itself when formatted
693 Let any format spec pass through. This lets us selectively defer
694 expansion of runtime variables without losing format spec details.
695 """
696 def __format__(self, format_spec):
697 return "".join(("{", self.data, f":{format_spec}" if format_spec else "", "}"))
700class FormatDate:
701 """a datetime wrapper with extended pyformat syntax"""
703 date_increment = re.compile(r'\+[0-9]+[Mdh]')
705 def __init__(self, d=None):
706 self._d = d
708 def __str__(self):
709 return str(self._d)
711 @property
712 def datetime(self):
713 return self._d
715 @classmethod
716 def utcnow(cls):
717 return cls(datetime.utcnow())
719 def __getattr__(self, k):
720 return getattr(self._d, k)
722 def __format__(self, fmt=None):
723 d = self._d
724 increments = self.date_increment.findall(fmt)
725 for i in increments:
726 p = {}
727 if i[-1] == 'M':
728 p['minutes'] = float(i[1:-1])
729 if i[-1] == 'h':
730 p['hours'] = float(i[1:-1])
731 if i[-1] == 'd':
732 p['days'] = float(i[1:-1])
733 d = d + timedelta(**p)
734 if increments:
735 fmt = self.date_increment.sub("", fmt)
736 return d.__format__(fmt)
739class QueryParser:
741 QuerySchema = {}
742 type_name = ''
743 # Allow multiple values to be passed to a query param
744 multi_value = True
745 # If using multi_value, specify scalar fields here
746 single_value_fields = ()
748 @classmethod
749 def is_implicit_query_filter(cls, data):
750 key = list(data[0].keys())[0]
751 if (key not in cls.QuerySchema and 'Filters' in cls.QuerySchema and
752 (key in cls.QuerySchema['Filters'] or key.startswith('tag:'))):
753 return True
754 return False
756 @classmethod
757 def implicit_qfilter_translate(cls, data):
758 filters = []
759 for d in data:
760 key = list(d.keys())[0]
761 values = list(d.values())[0]
762 if not isinstance(values, list):
763 values = [values]
764 filters.append({'Name': key, 'Values': values})
765 return [{'Filters': filters}]
767 @classmethod
768 def parse(cls, data):
769 if not isinstance(data, (tuple, list)):
770 raise PolicyValidationError(
771 f"{cls.type_name} Query Invalid Format, must be array of dicts"
772 )
774 # Backwards compatibility
775 if data:
776 if not isinstance(data[0], dict):
777 raise PolicyValidationError(
778 f"{cls.type_name} Query Invalid Format, must be array of dicts"
779 )
780 # Check for query filter key value pairs not listed under 'Filters' key
781 if cls.is_implicit_query_filter(data):
782 data = cls.implicit_qfilter_translate(data)
784 # Support iam-policy and elasticache 'Name', 'Value' queries without 'Filters' key
785 if (data[0].get('Value') and
786 (cls.type_name == 'IAM Policy' or cls.type_name == 'ElastiCache')):
787 try:
788 data = [{d['Name']: d['Value']} for d in data]
789 except KeyError:
790 raise PolicyValidationError(
791 f"{cls.type_name} Query Invalid Format. "
792 f"Query: {data} is not a list of key-value pairs "
793 f"from {cls.QuerySchema}"
794 )
796 # Support ebs-snapshot and volume 'Name', 'Values' queries without 'Filters' key
797 elif (data[0].get('Values') and
798 (cls.type_name == 'EBS Snapshot' or
799 cls.type_name == 'EBS Volume')):
800 data = [{"Filters": data}]
802 results = []
803 names = set()
804 for d in data:
805 if not isinstance(d, dict):
806 raise PolicyValidationError(
807 f"Query Invalid Format. Must be a list of key-value pairs "
808 f"from {cls.QuerySchema}"
809 )
810 if not len(list(d.keys())) == 1:
811 raise PolicyValidationError(
812 f"Query Invalid Format. Must be a list of key-value pairs "
813 f"from {cls.QuerySchema}"
814 )
816 if d.get("Filters"):
817 results.append({"Filters": cls.parse_qfilters(d["Filters"])})
818 else:
819 key, value = cls.parse_query(d)
821 # Allow for multiple queries with the same key
822 if key in names and (not cls.multi_value or key in cls.single_value_fields):
823 raise PolicyValidationError(
824 f"{cls.type_name} Query Invalid Key: {key} Must be unique")
825 elif key in names:
826 for q in results:
827 if list(q.keys())[0] == key:
828 q[key].append(d[key])
829 else:
830 names.add(key)
831 results.append({key: value})
833 return results
835 @classmethod
836 def parse_qfilters(cls, data):
837 if not isinstance(data, (tuple, list)):
838 raise PolicyValidationError(
839 f"{cls.type_name} Query Filter Invalid Format, must be array of dicts"
840 )
842 results = []
843 names = set()
844 for f in data:
845 if not isinstance(f, dict):
846 raise PolicyValidationError(
847 f"{cls.type_name} Query Filter Invalid Format, must be array of dicts"
848 )
849 if "Name" not in f or "Values" not in f:
850 raise PolicyValidationError(
851 f"{cls.type_name} Query Filter Invalid: Each filter must "
852 "contain 'Name' and 'Values' keys."
853 )
855 key = f['Name']
856 values = f['Values']
858 if key not in cls.QuerySchema.get("Filters", {}) and not key.startswith('tag:'):
859 raise PolicyValidationError(
860 f"{cls.type_name} Query Filter Invalid Key: {key} "
861 f"Valid: {', '.join(cls.QuerySchema.keys())}"
862 )
864 if not isinstance(values, list):
865 raise PolicyValidationError(
866 f"{cls.type_name} Query Filter Invalid Value {f} for key {key}, must be array.")
868 vtype = cls.QuerySchema["Filters"].get(key)
869 if vtype is None and key.startswith('tag'):
870 vtype = str
872 for v in values:
873 cls.type_check(vtype, v)
875 # Allow for multiple queries with the same key
876 if key in names:
877 for qf in results:
878 if qf['Name'] == key:
879 qf['Values'].extend(values)
880 else:
881 names.add(key)
882 results.append({'Name': key, 'Values': values})
884 return results
886 @classmethod
887 def parse_query(cls, data):
888 key = list(data.keys())[0]
889 values = list(data.values())[0]
891 if (not cls.multi_value or key in cls.single_value_fields) and isinstance(values, list):
892 raise PolicyValidationError(
893 f"{cls.type_name} Query Invalid Value {values}: Value for {key} must be scalar"
894 )
895 elif (cls.multi_value and key not in cls.single_value_fields
896 and not isinstance(values, list)):
897 values = [values]
899 if key not in cls.QuerySchema:
900 raise PolicyValidationError(
901 f"{cls.type_name} Query Invalid Key: {key} "
902 f"Valid: {', '.join(cls.QuerySchema.keys())}"
903 )
905 vtype = cls.QuerySchema.get(key)
906 if isinstance(values, list):
907 for v in values:
908 cls.type_check(vtype, v)
909 else:
910 cls.type_check(vtype, values)
912 return key, values
914 @classmethod
915 def type_check(cls, vtype, value):
916 if isinstance(vtype, tuple):
917 if value not in vtype:
918 raise PolicyValidationError(
919 f"{cls.type_name} Query Invalid Value: {value} Valid: {', '.join(vtype)}")
920 elif vtype == 'date':
921 if not parse_date(value):
922 raise PolicyValidationError(
923 f"{cls.type_name} Query Invalid Date Value: {value}")
924 elif not isinstance(value, vtype):
925 raise PolicyValidationError(
926 f"{cls.type_name} Query Invalid Value Type {value}"
927 )
930def get_annotation_prefix(s):
931 return 'c7n:{}'.format(s)
934def merge_dict_list(dict_iter):
935 """take an list of dictionaries and merge them.
937 last dict wins/overwrites on keys.
938 """
939 result = {}
940 for d in dict_iter:
941 result.update(d)
942 return result
945def merge_dict(a, b):
946 """Perform a merge of dictionaries A and B
948 Any subdictionaries will be recursively merged.
949 Any leaf elements in the form of scalar will use the value from B.
950 If A is a str and B is a list, A will be inserted into the front of the list.
951 If A is a list and B is a str, B will be appended to the list.
952 If there are two lists for the same key, the lists will be merged
953 deduplicated with values in A first, followed by any additional values from B.
954 """
955 d = copy.deepcopy(a)
956 for k, v in b.items():
957 if k not in d:
958 d[k] = v
959 elif isinstance(d[k], dict) and isinstance(v, dict):
960 d[k] = merge_dict(d[k], v)
961 elif isinstance(d[k], list) and isinstance(v, list):
962 for val in v:
963 if val not in d[k]:
964 d[k].append(val)
965 elif isinstance(v, str) and isinstance(d[k], list):
966 if v in d[k]:
967 continue
968 else:
969 d[k].append(v)
970 elif isinstance(v, list) and isinstance(d[k], str):
971 if d[k] in v:
972 d[k] = v
973 else:
974 d[k] = [d[k]]
975 d[k].extend(v)
976 elif k in d and isinstance(v, (int, str, float, bool)):
977 d[k] = v
978 else:
979 raise Exception(f"k={k}, {type(v)} and {type(d[k])} not conformable.")
980 return d
983def compare_dicts_using_sets(a, b) -> bool:
984 """Compares two dicts and replaces any lists or strings with sets
986 Compares any lists in the dict as sets.
987 """
989 if a.keys() != b.keys():
990 return False
992 for k, v in b.items():
993 if isinstance(v, list):
994 v = format_to_set(v)
995 if isinstance(a[k], str):
996 a[k] = format_to_set(a[k])
997 if isinstance(a[k], list):
998 a[k] = format_to_set(a[k])
999 if isinstance(v, str):
1000 v = format_to_set(v)
1001 if isinstance(a[k], dict) and isinstance(v, dict):
1002 if compare_dicts_using_sets(a[k], v):
1003 continue
1004 if v != a[k]:
1005 return False
1006 return True
1009def format_to_set(x) -> set:
1010 """Formats lists and strings to sets.
1012 Strings return as a set with one string.
1013 Lists return as a set.
1014 Variables of other datatypes will return as the original datatype.
1015 """
1016 if isinstance(x, str):
1017 return set([x])
1018 if isinstance(x, list):
1019 return set(x)
1020 else:
1021 return x
1024def format_dict_with_sets(x: dict) -> dict:
1025 """Formats string and list values in a dict to sets.
1027 Any string value returns as a set with one string.
1028 Any list values return as a set.
1029 Returns a formatted dict.
1030 """
1031 if isinstance(x, dict):
1032 format_dict = {}
1033 for key, value in x.items():
1034 if isinstance(value, dict):
1035 format_dict[key] = format_dict_with_sets(value)
1036 else:
1037 format_dict[key] = format_to_set(value)
1038 return format_dict
1039 else:
1040 return x
1043def select_keys(d, keys):
1044 result = {}
1045 for k in keys:
1046 result[k] = d.get(k)
1047 return result
1050def get_human_size(size, precision=2):
1051 # interesting discussion on 1024 vs 1000 as base
1052 # https://en.wikipedia.org/wiki/Binary_prefix
1053 suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
1054 suffixIndex = 0
1055 while size > 1024:
1056 suffixIndex += 1
1057 size = size / 1024.0
1059 return "%.*f %s" % (precision, size, suffixes[suffixIndex])
1062def get_support_region(manager):
1063 # support is a unique service in that it doesnt support regional endpoints
1064 # thus, we need to construct the client based off the regions found here:
1065 # https://docs.aws.amazon.com/general/latest/gr/awssupport.html
1066 #
1067 # aws-cn uses cn-north-1 for both the Beijing and Ningxia regions
1068 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Beijing.html
1069 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Ningxia.html
1071 partition = get_partition(manager.config.region)
1072 support_region = None
1073 if partition == "aws":
1074 support_region = "us-east-1"
1075 elif partition == "aws-us-gov":
1076 support_region = "us-gov-west-1"
1077 elif partition == "aws-cn":
1078 support_region = "cn-north-1"
1079 return support_region
1082def get_resource_tagging_region(resource_type, region):
1083 # For global resources, tags don't populate in the get_resources call
1084 # unless the call is being made to us-east-1. For govcloud this is us-gov-west-1.
1086 partition = get_partition(region)
1087 if partition == "aws":
1088 return getattr(resource_type, 'global_resource', None) and 'us-east-1' or region
1089 elif partition == "aws-us-gov":
1090 return getattr(resource_type, 'global_resource', None) and 'us-gov-west-1' or region
1091 return region
1094def get_eni_resource_type(eni):
1095 if eni.get('Attachment'):
1096 instance_id = eni['Attachment'].get('InstanceId')
1097 else:
1098 instance_id = None
1099 description = eni.get('Description')
1100 # EC2
1101 if instance_id:
1102 rtype = 'ec2'
1103 # ELB/ELBv2
1104 elif description.startswith('ELB app/'):
1105 rtype = 'elb-app'
1106 elif description.startswith('ELB net/'):
1107 rtype = 'elb-net'
1108 elif description.startswith('ELB gwy/'):
1109 rtype = 'elb-gwy'
1110 elif description.startswith('ELB'):
1111 rtype = 'elb'
1112 # Other Resources
1113 elif description == 'ENI managed by APIGateway':
1114 rtype = 'apigw'
1115 elif description.startswith('AWS CodeStar Connections'):
1116 rtype = 'codestar'
1117 elif description.startswith('DAX'):
1118 rtype = 'dax'
1119 elif description.startswith('AWS created network interface for directory'):
1120 rtype = 'dir'
1121 elif description == 'DMSNetworkInterface':
1122 rtype = 'dms'
1123 elif description.startswith('arn:aws:ecs:'):
1124 rtype = 'ecs'
1125 elif description.startswith('EFS mount target for'):
1126 rtype = 'fsmt'
1127 elif description.startswith('ElastiCache'):
1128 rtype = 'elasticache'
1129 elif description.startswith('AWS ElasticMapReduce'):
1130 rtype = 'emr'
1131 elif description.startswith('CloudHSM Managed Interface'):
1132 rtype = 'hsm'
1133 elif description.startswith('CloudHsm ENI'):
1134 rtype = 'hsmv2'
1135 elif description.startswith('AWS Lambda VPC ENI'):
1136 rtype = 'lambda'
1137 elif description.startswith('AWS Lambda VPC'):
1138 rtype = 'lambda'
1139 elif description.startswith('Interface for NAT Gateway'):
1140 rtype = 'nat'
1141 elif (description == 'RDSNetworkInterface' or
1142 description.startswith('Network interface for DBProxy')):
1143 rtype = 'rds'
1144 elif description == 'RedshiftNetworkInterface':
1145 rtype = 'redshift'
1146 elif description.startswith('Network Interface for Transit Gateway Attachment'):
1147 rtype = 'tgw'
1148 elif description.startswith('VPC Endpoint Interface'):
1149 rtype = 'vpce'
1150 elif description.startswith('aws-k8s-branch-eni'):
1151 rtype = 'eks'
1152 else:
1153 rtype = 'unknown'
1154 return rtype
1157class C7NJmespathFunctions(functions.Functions):
1158 @functions.signature(
1159 {'types': ['string']}, {'types': ['string']}
1160 )
1161 def _func_split(self, sep, string):
1162 return string.split(sep)
1164 @functions.signature(
1165 {'types': ['string']}
1166 )
1167 def _func_from_json(self, string):
1168 try:
1169 return json.loads(string)
1170 except json.JSONDecodeError:
1171 return None
1174class C7NJMESPathParser(Parser):
1175 def parse(self, expression):
1176 result = super().parse(expression)
1177 return ParsedResultWithOptions(
1178 expression=result.expression,
1179 parsed=result.parsed
1180 )
1183class ParsedResultWithOptions(ParsedResult):
1184 def search(self, value, options=None):
1185 # if options are explicitly passed in, we honor those
1186 if not options:
1187 options = jmespath.Options(custom_functions=C7NJmespathFunctions())
1188 return super().search(value, options)
1191def jmespath_search(*args, **kwargs):
1192 return jmespath.search(
1193 *args,
1194 **kwargs,
1195 options=jmespath.Options(custom_functions=C7NJmespathFunctions())
1196 )
1199def get_path(path: str, resource: dict):
1200 """
1201 This function provides a wrapper to obtain a value from a resource
1202 in an efficient manner.
1203 jmespath_search is expensive and it's rarely the case that
1204 there is a path in the id field, therefore this wrapper is an optimisation.
1206 :param path: the path or field name to fetch
1207 :param resource: the resource instance description
1208 :return: the field/path value
1209 """
1210 if '.' in path:
1211 return jmespath_search(path, resource)
1212 return resource[path]
1215def jmespath_compile(expression):
1216 parsed = C7NJMESPathParser().parse(expression)
1217 return parsed
1220def snap_to_period_start(start: datetime, end: datetime, period_start: str):
1221 """
1222 Adjust the start and end timestamps according to `period_start`.
1224 Args:
1225 start (datetime): The original start datetime.
1226 end (datetime): The original end datetime.
1227 period_start (str): One of 'auto', 'start-of-day'.
1229 Returns:
1230 Tuple[datetime, datetime]: Adjusted (start, end)
1231 """
1232 if period_start == "start-of-day":
1233 start = start.replace(hour=0, minute=0, second=0, microsecond=0)
1234 end = end.replace(hour=0, minute=0, second=0, microsecond=0)
1235 # 'auto' does nothing
1236 return start, end