Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/policy.py: 27%
698 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from datetime import datetime
4import json
5import fnmatch
6import itertools
7import logging
8import os
9import time
10from typing import List
12from dateutil import parser, tz as tzutil
14from c7n.cwe import CloudWatchEvents
15from c7n.ctx import ExecutionContext
16from c7n.exceptions import PolicyValidationError, ClientError, ResourceLimitExceeded
17from c7n.filters import FilterRegistry, And, Or, Not
18from c7n.manager import iter_filters
19from c7n.output import DEFAULT_NAMESPACE
20from c7n.resources import load_resources
21from c7n.registry import PluginRegistry
22from c7n.provider import clouds, get_resource_class
23from c7n import deprecated, utils
24from c7n.version import version
25from c7n.query import RetryPageIterator
26from c7n.varfmt import VarFormat
27from c7n.utils import get_policy_provider, jmespath_compile
29log = logging.getLogger('c7n.policy')
32def load(options, path, format=None, validate=True, vars=None):
33 # should we do os.path.expanduser here?
34 if not os.path.exists(path):
35 raise IOError("Invalid path for config %r" % path)
37 from c7n.schema import validate as schema_validate, StructureParser
38 if os.path.isdir(path):
39 from c7n.loader import DirectoryLoader
40 collection = DirectoryLoader(options).load_directory(path, validate)
41 if validate:
42 [p.validate() for p in collection]
43 return collection
45 if os.path.isfile(path):
46 data = utils.load_file(path, format=format, vars=vars)
48 structure = StructureParser()
49 structure.validate(data)
50 rtypes = structure.get_resource_types(data)
51 load_resources(rtypes)
53 if isinstance(data, list):
54 log.warning('yaml in invalid format. The "policies:" line is probably missing.')
55 return None
57 if validate:
58 errors = schema_validate(data, resource_types=rtypes)
59 if errors:
60 raise PolicyValidationError(
61 "Failed to validate policy %s \n %s" % (
62 errors[1], errors[0]))
64 # Test for empty policy file
65 if not data or data.get('policies') is None:
66 return None
68 collection = PolicyCollection.from_data(data, options)
69 if validate:
70 # non schema validation of policies
71 [p.validate() for p in collection]
72 return collection
75class PolicyCollection:
77 log = logging.getLogger('c7n.policies')
79 def __init__(self, policies: 'List[Policy]', options):
80 self.options = options
81 self.policies = policies
83 @classmethod
84 def from_data(cls, data: dict, options, session_factory=None):
85 # session factory param introduction needs an audit and review
86 # on tests.
87 sf = session_factory if session_factory else cls.session_factory()
88 policies = [Policy(p, options, session_factory=sf)
89 for p in data.get('policies', ())]
90 return cls(policies, options)
92 def __add__(self, other):
93 return self.__class__(self.policies + other.policies, self.options)
95 def filter(self, policy_patterns=(), resource_types=(), modes=()):
96 results = self.policies
97 results = self._filter_by_patterns(results, policy_patterns)
98 results = self._filter_by_resource_types(results, resource_types)
99 results = self._filter_by_modes(results, modes)
100 # next line brings the result set in the original order of self.policies
101 results = [x for x in self.policies if x in results]
102 return PolicyCollection(results, self.options)
104 def _filter_by_patterns(self, policies, patterns):
105 """
106 Takes a list of policies and returns only those matching the given glob
107 patterns
108 """
109 if not patterns:
110 return policies
112 results = []
113 for pattern in patterns:
114 result = self._filter_by_pattern(policies, pattern)
115 results.extend(x for x in result if x not in results)
116 return results
118 def _filter_by_pattern(self, policies, pattern):
119 """
120 Takes a list of policies and returns only those matching the given glob
121 pattern
122 """
123 results = []
124 for policy in policies:
125 if fnmatch.fnmatch(policy.name, pattern):
126 results.append(policy)
128 if not results:
129 self.log.warning((
130 'Policy pattern "{}" '
131 'did not match any policies.').format(pattern))
133 return results
135 def _filter_by_resource_types(self, policies, resource_types):
136 """
137 Takes a list of policies and returns only those matching the given
138 resource types
139 """
140 if not resource_types:
141 return policies
143 results = []
144 for resource_type in resource_types:
145 result = self._filter_by_resource_type(policies, resource_type)
146 results.extend(x for x in result if x not in results)
147 return results
149 def _filter_by_resource_type(self, policies, resource_type):
150 """
151 Takes a list policies and returns only those matching the given resource
152 type
153 """
154 results = []
155 for policy in policies:
156 if policy.resource_type == resource_type:
157 results.append(policy)
159 if not results:
160 self.log.warning((
161 'Resource type "{}" '
162 'did not match any policies.').format(resource_type))
164 return results
166 def _filter_by_modes(self, policies, modes):
167 """
168 Takes a list of policies and returns only those matching a given mode
169 """
170 if not modes:
171 return policies
172 results = []
173 for mode in modes:
174 result = self._filter_by_mode(policies, mode)
175 results.extend(x for x in result if x not in results)
176 return results
178 def _filter_by_mode(self, policies, mode):
179 """
180 Takes a list of policies and returns only those matching a given mode
181 """
182 results = []
183 for policy in policies:
184 if policy.get_execution_mode().type == mode:
185 results.append(policy)
186 if not results:
187 self.log.warning((
188 'Filter by modes type "{}" '
189 'did not match any policies.').format(mode))
190 return results
192 def __iter__(self):
193 return iter(self.policies)
195 def __contains__(self, policy_name):
196 for p in self.policies:
197 if p.name == policy_name:
198 return True
199 return False
201 def __len__(self):
202 return len(self.policies)
204 @property
205 def resource_types(self):
206 """resource types used by the collection."""
207 rtypes = set()
208 for p in self.policies:
209 rtypes.add(p.resource_type)
210 return rtypes
212 # cli/collection tests patch this
213 @classmethod
214 def session_factory(cls):
215 return None
218class PolicyExecutionMode:
219 """Policy execution semantics"""
221 POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime')
222 permissions = ()
224 def __init__(self, policy):
225 self.policy = policy
227 def run(self, event=None, lambda_context=None):
228 """Run the actual policy."""
229 raise NotImplementedError("subclass responsibility")
231 def provision(self):
232 """Provision any resources needed for the policy."""
234 def get_logs(self, start, end):
235 """Retrieve logs for the policy"""
236 raise NotImplementedError("subclass responsibility")
238 def validate(self):
239 """Validate configuration settings for execution mode."""
241 def get_permissions(self):
242 return self.permissions
244 def get_metrics(self, start, end, period):
245 """Retrieve any associated metrics for the policy."""
246 values = {}
247 default_dimensions = {
248 'Policy': self.policy.name, 'ResType': self.policy.resource_type,
249 'Scope': 'Policy'}
251 metrics = list(self.POLICY_METRICS)
253 # Support action, and filter custom metrics
254 for el in itertools.chain(
255 self.policy.resource_manager.actions,
256 self.policy.resource_manager.filters):
257 if el.metrics:
258 metrics.extend(el.metrics)
260 session = utils.local_session(self.policy.session_factory)
261 client = session.client('cloudwatch')
263 for m in metrics:
264 if isinstance(m, str):
265 dimensions = default_dimensions
266 else:
267 m, m_dimensions = m
268 dimensions = dict(default_dimensions)
269 dimensions.update(m_dimensions)
270 results = client.get_metric_statistics(
271 Namespace=DEFAULT_NAMESPACE,
272 Dimensions=[
273 {'Name': k, 'Value': v} for k, v
274 in dimensions.items()],
275 Statistics=['Sum', 'Average'],
276 StartTime=start,
277 EndTime=end,
278 Period=period,
279 MetricName=m)
280 values[m] = results['Datapoints']
281 return values
283 def get_deprecations(self):
284 # The execution mode itself doesn't have a data dict, so we grab the
285 # mode part from the policy data dict itself.
286 return deprecated.check_deprecations(self, data=self.policy.data.get('mode', {}))
289class ServerlessExecutionMode(PolicyExecutionMode):
290 def run(self, event=None, lambda_context=None):
291 """Run the actual policy."""
292 raise NotImplementedError("subclass responsibility")
294 def get_logs(self, start, end):
295 """Retrieve logs for the policy"""
296 raise NotImplementedError("subclass responsibility")
298 def provision(self):
299 """Provision any resources needed for the policy."""
300 raise NotImplementedError("subclass responsibility")
303execution = PluginRegistry('c7n.execution')
306@execution.register('pull')
307class PullMode(PolicyExecutionMode):
308 """Pull mode execution of a policy.
310 Queries resources from cloud provider for filtering and actions.
311 """
313 schema = utils.type_schema('pull')
315 def run(self, *args, **kw):
316 if not self.policy.is_runnable():
317 return []
319 with self.policy.ctx as ctx:
320 self.policy.log.debug(
321 "Running policy:%s resource:%s region:%s c7n:%s",
322 self.policy.name,
323 self.policy.resource_type,
324 self.policy.options.region or 'default',
325 version,
326 )
328 s = time.time()
329 try:
330 resources = self.policy.resource_manager.resources()
331 except ResourceLimitExceeded as e:
332 self.policy.log.error(str(e))
333 ctx.metrics.put_metric(
334 'ResourceLimitExceeded', e.selection_count, "Count"
335 )
336 raise
338 rt = time.time() - s
339 self.policy.log.info(
340 "policy:%s resource:%s region:%s count:%d time:%0.2f",
341 self.policy.name,
342 self.policy.resource_type,
343 self.policy.options.region,
344 len(resources),
345 rt,
346 )
347 ctx.metrics.put_metric(
348 "ResourceCount", len(resources), "Count", Scope="Policy"
349 )
350 ctx.metrics.put_metric("ResourceTime", rt, "Seconds", Scope="Policy")
351 ctx.output.write_file('resources.json', utils.dumps(resources, indent=2))
353 if not resources:
354 return []
356 if self.policy.options.dryrun:
357 self.policy.log.debug("dryrun: skipping actions")
358 return resources
360 at = time.time()
361 for a in self.policy.resource_manager.actions:
362 s = time.time()
363 with ctx.tracer.subsegment('action:%s' % a.type):
364 results = a.process(resources)
365 self.policy.log.info(
366 "policy:%s action:%s"
367 " resources:%d"
368 " execution_time:%0.2f"
369 % (self.policy.name, a.name, len(resources), time.time() - s)
370 )
371 if results:
372 ctx.output.write_file("action-%s" % a.name, utils.dumps(results))
373 ctx.metrics.put_metric(
374 "ActionTime", time.time() - at, "Seconds", Scope="Policy"
375 )
376 return resources
379class LambdaMode(ServerlessExecutionMode):
380 """A policy that runs/executes in lambda."""
382 POLICY_METRICS = ('ResourceCount',)
384 schema = {
385 'type': 'object',
386 'additionalProperties': False,
387 'properties': {
388 'execution-options': {'type': 'object'},
389 'function-prefix': {'type': 'string'},
390 'member-role': {'type': 'string'},
391 'packages': {'type': 'array', 'items': {'type': 'string'}},
392 # Lambda passthrough config
393 'layers': {'type': 'array', 'items': {'type': 'string'}},
394 'concurrency': {'type': 'integer'},
395 # Do we really still support 2.7 and 3.6?
396 'runtime': {'enum': ['python2.7', 'python3.6',
397 'python3.7', 'python3.8', 'python3.9', 'python3.10',
398 'python3.11']},
399 'role': {'type': 'string'},
400 'handler': {'type': 'string'},
401 'pattern': {'type': 'object', 'minProperties': 1},
402 'timeout': {'type': 'number'},
403 'memory': {'type': 'number'},
404 'environment': {'type': 'object'},
405 'tags': {'type': 'object'},
406 'dead_letter_config': {'type': 'object'},
407 'kms_key_arn': {'type': 'string'},
408 'tracing_config': {'type': 'object'},
409 'security_groups': {'type': 'array'},
410 'subnets': {'type': 'array'}
411 }
412 }
414 def validate(self):
415 super(LambdaMode, self).validate()
416 prefix = self.policy.data['mode'].get('function-prefix', 'custodian-')
417 MAX_LAMBDA_FUNCTION_NAME_LENGTH = 64
418 if len(prefix + self.policy.name) > MAX_LAMBDA_FUNCTION_NAME_LENGTH:
419 raise PolicyValidationError(
420 "Custodian Lambda policies have a max length with prefix of %s"
421 " policy:%s prefix:%s" % (
422 MAX_LAMBDA_FUNCTION_NAME_LENGTH,
423 self.policy.name,
424 prefix
425 )
426 )
427 MAX_LAMBDA_FUNCTION_DESCRIPTION_LENGTH = 256
428 if len(self.policy.description) > MAX_LAMBDA_FUNCTION_DESCRIPTION_LENGTH:
429 raise PolicyValidationError(
430 'Custodian Lambda policies have a max description length of %s'
431 ' policy: %s description: %s' % (
432 MAX_LAMBDA_FUNCTION_DESCRIPTION_LENGTH,
433 self.policy.name,
434 self.policy.description
435 )
436 )
437 tags = self.policy.data['mode'].get('tags')
438 if not tags:
439 return
440 reserved_overlap = [t for t in tags if t.startswith('custodian-')]
441 if reserved_overlap:
442 log.warning((
443 'Custodian reserves policy lambda '
444 'tags starting with custodian - policy specifies %s' % (
445 ', '.join(reserved_overlap))))
447 def get_member_account_id(self, event):
448 return event.get('account')
450 def get_member_region(self, event):
451 return event.get('region')
453 def assume_member(self, event):
454 # if a member role is defined we're being run out of the master, and we need
455 # to assume back into the member for policy execution.
456 member_role = self.policy.data['mode'].get('member-role')
457 member_id = self.get_member_account_id(event)
458 region = self.get_member_region(event)
459 if member_role and member_id and region:
460 # In the master account we might be multiplexing a hot lambda across
461 # multiple member accounts for each event/invocation.
462 member_role = member_role.format(account_id=member_id)
463 utils.reset_session_cache()
464 self.policy.options['account_id'] = member_id
465 self.policy.options['region'] = region
466 self.policy.session_factory.region = region
467 self.policy.session_factory.assume_role = member_role
468 self.policy.log.info(
469 "Assuming member role:%s", member_role)
470 return True
471 return False
473 def resolve_resources(self, event):
474 self.assume_member(event)
475 mode = self.policy.data.get('mode', {})
476 resource_ids = CloudWatchEvents.get_ids(event, mode)
477 if resource_ids is None:
478 raise ValueError("Unknown push event mode %s", self.data)
479 self.policy.log.info('Found resource ids:%s', resource_ids)
480 # Handle multi-resource type events, like ec2 CreateTags
481 resource_ids = self.policy.resource_manager.match_ids(resource_ids)
482 if not resource_ids:
483 self.policy.log.warning("Could not find resource ids")
484 return []
486 resources = self.policy.resource_manager.get_resources(resource_ids)
487 if 'debug' in event:
488 self.policy.log.info("Resources %s", resources)
489 return resources
491 def run(self, event, lambda_context):
492 """Run policy in push mode against given event.
494 Lambda automatically generates cloud watch logs, and metrics
495 for us, albeit with some deficienies, metrics no longer count
496 against valid resources matches, but against execution.
498 If metrics execution option is enabled, custodian will generate
499 metrics per normal.
500 """
501 self.setup_exec_environment(event)
502 if not self.policy.is_runnable(event):
503 return
504 resources = self.resolve_resources(event)
505 if not resources:
506 return resources
507 rcount = len(resources)
508 resources = self.policy.resource_manager.filter_resources(
509 resources, event)
511 if 'debug' in event:
512 self.policy.log.info(
513 "Filtered resources %d of %d", len(resources), rcount)
515 if not resources:
516 self.policy.log.info(
517 "policy:%s resources:%s no resources matched" % (
518 self.policy.name, self.policy.resource_type))
519 return
520 return self.run_resource_set(event, resources)
522 def setup_exec_environment(self, event):
523 mode = self.policy.data.get('mode', {})
524 if not bool(mode.get("log", True)):
525 root = logging.getLogger()
526 map(root.removeHandler, root.handlers[:])
527 root.handlers = [logging.NullHandler()]
529 def run_resource_set(self, event, resources):
530 from c7n.actions import EventAction
532 with self.policy.ctx as ctx:
533 ctx.metrics.put_metric(
534 'ResourceCount', len(resources), 'Count', Scope="Policy", buffer=False
535 )
537 if 'debug' in event:
538 self.policy.log.info(
539 "Invoking actions %s", self.policy.resource_manager.actions
540 )
542 ctx.output.write_file('resources.json', utils.dumps(resources, indent=2))
544 for action in self.policy.resource_manager.actions:
545 self.policy.log.info(
546 "policy:%s invoking action:%s resources:%d",
547 self.policy.name,
548 action.name,
549 len(resources),
550 )
551 if isinstance(action, EventAction):
552 results = action.process(resources, event)
553 else:
554 results = action.process(resources)
555 ctx.output.write_file("action-%s" % action.name, utils.dumps(results))
556 return resources
558 @property
559 def policy_lambda(self):
560 from c7n import mu
561 return mu.PolicyLambda
563 def provision(self):
564 # auto tag lambda policies with mode and version, we use the
565 # version in mugc to effect cleanups.
566 tags = self.policy.data['mode'].setdefault('tags', {})
567 tags['custodian-info'] = "mode=%s:version=%s" % (
568 self.policy.data['mode']['type'], version)
570 from c7n import mu
571 with self.policy.ctx:
572 self.policy.log.info(
573 "Provisioning policy lambda: %s region: %s", self.policy.name,
574 self.policy.options.region)
575 try:
576 manager = mu.LambdaManager(self.policy.session_factory)
577 except ClientError:
578 # For cli usage by normal users, don't assume the role just use
579 # it for the lambda
580 manager = mu.LambdaManager(
581 lambda assume=False: self.policy.session_factory(assume))
582 return manager.publish(
583 self.policy_lambda(self.policy),
584 role=self.policy.options.assume_role)
587@execution.register('periodic')
588class PeriodicMode(LambdaMode, PullMode):
589 """A policy that runs in pull mode within lambda.
591 Runs Custodian in AWS lambda at user defined cron interval.
592 """
594 POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime')
596 schema = utils.type_schema(
597 'periodic', schedule={'type': 'string'}, rinherit=LambdaMode.schema)
599 def run(self, event, lambda_context):
600 return PullMode.run(self)
603@execution.register('phd')
604class PHDMode(LambdaMode):
605 """Personal Health Dashboard event based policy execution.
607 PHD events are triggered by changes in the operations health of
608 AWS services and data center resources,
610 See `Personal Health Dashboard
611 <https://aws.amazon.com/premiumsupport/technology/personal-health-dashboard/>`_
612 for more details.
613 """
615 schema = utils.type_schema(
616 'phd',
617 events={'type': 'array', 'items': {'type': 'string'}},
618 categories={'type': 'array', 'items': {
619 'enum': ['issue', 'accountNotification', 'scheduledChange']}},
620 statuses={'type': 'array', 'items': {
621 'enum': ['open', 'upcoming', 'closed']}},
622 rinherit=LambdaMode.schema)
624 def validate(self):
625 super(PHDMode, self).validate()
626 if self.policy.resource_type == 'account':
627 return
628 if 'health-event' not in self.policy.resource_manager.filter_registry:
629 raise PolicyValidationError(
630 "policy:%s phd event mode not supported for resource:%s" % (
631 self.policy.name, self.policy.resource_type))
632 if 'events' not in self.policy.data['mode']:
633 raise PolicyValidationError(
634 'policy:%s phd event mode requires events for resource:%s' % (
635 self.policy.name, self.policy.resource_type))
637 @staticmethod
638 def process_event_arns(client, event_arns):
639 entities = []
640 paginator = client.get_paginator('describe_affected_entities')
641 for event_set in utils.chunks(event_arns, 10):
642 # Note: we aren't using event_set here, just event_arns.
643 entities.extend(list(itertools.chain(
644 *[p['entities'] for p in paginator.paginate(
645 filter={'eventArns': event_arns})])))
646 return entities
648 def resolve_resources(self, event):
649 session = utils.local_session(self.policy.resource_manager.session_factory)
650 health = session.client('health', region_name='us-east-1')
651 he_arn = event['detail']['eventArn']
652 resource_arns = self.process_event_arns(health, [he_arn])
654 m = self.policy.resource_manager.get_model()
655 if 'arn' in m.id.lower():
656 resource_ids = [r['entityValue'].rsplit('/', 1)[-1] for r in resource_arns]
657 else:
658 resource_ids = [r['entityValue'] for r in resource_arns]
660 resources = self.policy.resource_manager.get_resources(resource_ids)
661 for r in resources:
662 r.setdefault('c7n:HealthEvent', []).append(he_arn)
663 return resources
666@execution.register('cloudtrail')
667class CloudTrailMode(LambdaMode):
668 """A lambda policy using cloudwatch events rules on cloudtrail api logs."""
670 schema = utils.type_schema(
671 'cloudtrail',
672 delay={'type': 'integer',
673 'description': 'sleep for delay seconds before processing an event'},
674 events={'type': 'array', 'items': {
675 'oneOf': [
676 {'type': 'string'},
677 {'type': 'object',
678 'required': ['event', 'source', 'ids'],
679 'properties': {
680 'source': {'type': 'string'},
681 'ids': {'type': 'string'},
682 'event': {'type': 'string'}}}]
683 }},
684 rinherit=LambdaMode.schema)
686 def validate(self):
687 super(CloudTrailMode, self).validate()
688 from c7n import query
689 events = self.policy.data['mode'].get('events')
690 assert events, "cloud trail mode requires specifiying events to subscribe"
691 for e in events:
692 if isinstance(e, str):
693 assert e in CloudWatchEvents.trail_events, "event shortcut not defined: %s" % e
694 if isinstance(e, dict):
695 jmespath_compile(e['ids'])
696 if isinstance(self.policy.resource_manager, query.ChildResourceManager):
697 if not getattr(self.policy.resource_manager.resource_type,
698 'supports_trailevents', False):
699 raise ValueError(
700 "resource:%s does not support cloudtrail mode policies" % (
701 self.policy.resource_type))
703 def resolve_resources(self, event):
704 # override to enable delay before fetching resources
705 delay = self.policy.data.get('mode', {}).get('delay')
706 if delay:
707 time.sleep(delay)
708 return super().resolve_resources(event)
711@execution.register('ec2-instance-state')
712class EC2InstanceState(LambdaMode):
713 """
714 A lambda policy that executes on ec2 instance state changes.
716 See `EC2 lifecycles
717 <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html>`_
718 for more details.
719 """
721 schema = utils.type_schema(
722 'ec2-instance-state', rinherit=LambdaMode.schema,
723 events={'type': 'array', 'items': {
724 'enum': ['pending', 'running', 'shutting-down',
725 'stopped', 'stopping', 'terminated']}})
728@execution.register('asg-instance-state')
729class ASGInstanceState(LambdaMode):
730 """a lambda policy that executes on an asg's ec2 instance state changes.
732 See `ASG Events
733 <https://docs.aws.amazon.com/autoscaling/ec2/userguide/cloud-watch-events.html>`_
734 for more details.
735 """
737 schema = utils.type_schema(
738 'asg-instance-state', rinherit=LambdaMode.schema,
739 events={'type': 'array', 'items': {
740 'enum': ['launch-success', 'launch-failure',
741 'terminate-success', 'terminate-failure']}})
744@execution.register('guard-duty')
745class GuardDutyMode(LambdaMode):
746 """Incident Response for AWS Guard Duty.
748 AWS Guard Duty is a threat detection service that continuously
749 monitors for malicious activity and unauthorized behavior. This
750 mode allows you to execute polcies when various alerts are created
751 by AWS Guard Duty for automated incident response. See `Guard Duty
752 <https://aws.amazon.com/guardduty/>`_ for more details.
753 """
755 schema = utils.type_schema('guard-duty', rinherit=LambdaMode.schema)
757 supported_resources = ('account', 'ec2', 'iam-user')
759 id_exprs = {
760 'account': jmespath_compile('detail.accountId'),
761 'ec2': jmespath_compile('detail.resource.instanceDetails.instanceId'),
762 'iam-user': jmespath_compile('detail.resource.accessKeyDetails.userName')}
764 def get_member_account_id(self, event):
765 return event['detail']['accountId']
767 def resolve_resources(self, event):
768 self.assume_member(event)
769 rid = self.id_exprs[self.policy.resource_type].search(event)
770 resources = self.policy.resource_manager.get_resources([rid])
771 # For iam users annotate with the access key specified in the finding event
772 if resources and self.policy.resource_type == 'iam-user':
773 resources[0]['c7n:AccessKeys'] = {
774 'AccessKeyId': event['detail']['resource']['accessKeyDetails']['accessKeyId']}
775 return resources
777 def validate(self):
778 super(GuardDutyMode, self).validate()
779 if self.policy.data['resource'] not in self.supported_resources:
780 raise ValueError(
781 "Policy:%s resource:%s Guard duty mode only supported for %s" % (
782 self.policy.data['name'],
783 self.policy.data['resource'],
784 self.supported_resources))
786 def provision(self):
787 if self.policy.data['resource'] == 'ec2':
788 self.policy.data['mode']['resource-filter'] = 'Instance'
789 elif self.policy.data['resource'] == 'iam-user':
790 self.policy.data['mode']['resource-filter'] = 'AccessKey'
791 return super(GuardDutyMode, self).provision()
794@execution.register('config-poll-rule')
795class ConfigPollRuleMode(LambdaMode, PullMode):
796 """This mode represents a periodic/scheduled AWS config evaluation.
798 The primary benefit this mode offers is to support additional resources
799 beyond what config supports natively, as it can post evaluations for
800 any resource which has a cloudformation type.
802 If a resource is natively supported by config it's highly recommended
803 to use a `config-rule` mode instead. Deployment will fail unless
804 the policy explicitly opts out of that check with `ignore-support-check`.
805 This can be useful in cases when a policy resource has native Config
806 support, but filters based on related resource attributes.
808 :example:
810 VPCs have native Config support, but flow logs are a separate resource.
811 This policy forces `config-poll-rule` mode to bypass the Config support
812 check and evaluate VPC compliance on a schedule.
814 .. code-block:: yaml
816 policies:
817 - name: vpc-flow-logs
818 resource: vpc
819 mode:
820 type: config-poll-rule
821 role: arn:aws:iam::{account_id}:role/MyRole
822 ignore-support-check: True
823 filters:
824 - not:
825 - type: flow-logs
826 destination-type: "s3"
827 enabled: True
828 status: active
829 traffic-type: all
830 destination: "arn:aws:s3:::mys3flowlogbucket"
832 This mode effectively receives no data from config, instead it's
833 periodically executed by config and polls and evaluates all
834 resources. It is equivalent to a periodic policy, except it also
835 pushes resource evaluations to config.
836 """
837 schema = utils.type_schema(
838 'config-poll-rule',
839 schedule={'enum': [
840 "One_Hour",
841 "Three_Hours",
842 "Six_Hours",
843 "Twelve_Hours",
844 "TwentyFour_Hours"]},
845 **{'ignore-support-check': {'type': 'boolean'}},
846 rinherit=LambdaMode.schema)
848 def validate(self):
849 super().validate()
850 if not self.policy.data['mode'].get('schedule'):
851 raise PolicyValidationError(
852 "policy:%s config-poll-rule schedule required" % (
853 self.policy.name))
854 if (
855 self.policy.resource_manager.resource_type.config_type
856 and not self.policy.data['mode'].get('ignore-support-check')
857 ):
858 raise PolicyValidationError(
859 "resource:%s fully supported by config and should use mode: config-rule" % (
860 self.policy.resource_type))
861 if self.policy.data['mode'].get('pattern'):
862 raise PolicyValidationError(
863 "policy:%s AWS Config does not support event pattern filtering" % (
864 self.policy.name))
865 if not self.policy.resource_manager.resource_type.cfn_type:
866 raise PolicyValidationError((
867 'policy:%s resource:%s does not have a cloudformation type'
868 ' and is there-fore not supported by config-poll-rule'))
870 @staticmethod
871 def get_obsolete_evaluations(client, cfg_rule_name, ordering_ts, evaluations):
872 """Get list of evaluations that are no longer applicable due to resources being deleted
873 """
874 latest_resource_ids = set()
875 for latest_eval in evaluations:
876 latest_resource_ids.add(latest_eval['ComplianceResourceId'])
878 obsolete_evaluations = []
879 paginator = client.get_paginator('get_compliance_details_by_config_rule')
880 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
881 old_evals = paginator.paginate(
882 ConfigRuleName=cfg_rule_name,
883 ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'],
884 PaginationConfig={'PageSize': 100}).build_full_result().get('EvaluationResults', [])
886 for old_eval in old_evals:
887 eval_res_qual = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']
888 old_resource_id = eval_res_qual['ResourceId']
889 if old_resource_id not in latest_resource_ids:
890 obsolete_evaluation = {
891 'ComplianceResourceType': eval_res_qual['ResourceType'],
892 'ComplianceResourceId': old_resource_id,
893 'Annotation': 'The rule does not apply.',
894 'ComplianceType': 'NOT_APPLICABLE',
895 'OrderingTimestamp': ordering_ts}
896 obsolete_evaluations.append(obsolete_evaluation)
897 return obsolete_evaluations
899 def _get_client(self):
900 return utils.local_session(
901 self.policy.session_factory).client('config')
903 def put_evaluations(self, client, token, evaluations):
904 for eval_set in utils.chunks(evaluations, 100):
905 self.policy.resource_manager.retry(
906 client.put_evaluations,
907 Evaluations=eval_set,
908 ResultToken=token)
910 def run(self, event, lambda_context):
911 cfg_event = json.loads(event['invokingEvent'])
912 resource_type = self.policy.resource_manager.resource_type.cfn_type
913 resource_id = self.policy.resource_manager.resource_type.config_id or \
914 self.policy.resource_manager.resource_type.id
915 client = self._get_client()
916 token = event.get('resultToken')
917 cfg_rule_name = event['configRuleName']
918 ordering_ts = cfg_event['notificationCreationTime']
919 policy_data = self.policy.data.copy()
920 policy_data.pop("filters", None)
922 matched_resources = set()
923 unmatched_resources = set()
924 for r in PullMode.run(self):
925 matched_resources.add(r[resource_id])
926 for r in self.policy.resource_manager.get_resource_manager(
927 self.policy.resource_type,
928 policy_data).resources():
929 if r[resource_id] not in matched_resources:
930 unmatched_resources.add(r[resource_id])
932 non_compliant_evals = [dict(
933 ComplianceResourceType=resource_type,
934 ComplianceResourceId=r,
935 ComplianceType='NON_COMPLIANT',
936 OrderingTimestamp=ordering_ts,
937 Annotation='The resource is not compliant with policy:%s.' % (
938 self.policy.name))
939 for r in matched_resources]
940 compliant_evals = [dict(
941 ComplianceResourceType=resource_type,
942 ComplianceResourceId=r,
943 ComplianceType='COMPLIANT',
944 OrderingTimestamp=ordering_ts,
945 Annotation='The resource is compliant with policy:%s.' % (
946 self.policy.name))
947 for r in unmatched_resources]
948 evaluations = non_compliant_evals + compliant_evals
949 obsolete_evaluations = self.get_obsolete_evaluations(
950 client, cfg_rule_name, ordering_ts, evaluations)
951 evaluations = evaluations + obsolete_evaluations
953 if evaluations and token:
954 self.put_evaluations(client, token, evaluations)
956 return list(matched_resources)
959@execution.register('config-rule')
960class ConfigRuleMode(LambdaMode):
961 """a lambda policy that executes as a config service rule.
963 The policy is invoked on configuration changes to resources.
965 See `AWS Config <https://aws.amazon.com/config/>`_ for more details.
966 """
967 cfg_event = None
968 schema = utils.type_schema('config-rule', rinherit=LambdaMode.schema)
970 def validate(self):
971 super(ConfigRuleMode, self).validate()
972 if not self.policy.resource_manager.resource_type.config_type:
973 raise PolicyValidationError(
974 "policy:%s AWS Config does not support resource-type:%s" % (
975 self.policy.name, self.policy.resource_type))
976 if self.policy.data['mode'].get('pattern'):
977 raise PolicyValidationError(
978 "policy:%s AWS Config does not support event pattern filtering" % (
979 self.policy.name))
981 def resolve_resources(self, event):
982 source = self.policy.resource_manager.get_source('config')
983 return [source.load_resource(self.cfg_event['configurationItem'])]
985 def run(self, event, lambda_context):
986 self.cfg_event = json.loads(event['invokingEvent'])
987 cfg_item = self.cfg_event['configurationItem']
988 evaluation = None
989 resources = []
991 # TODO config resource type matches policy check
992 if event.get('eventLeftScope') or cfg_item['configurationItemStatus'] in (
993 "ResourceDeleted",
994 "ResourceNotRecorded",
995 "ResourceDeletedNotRecorded"):
996 evaluation = {
997 'annotation': 'The rule does not apply.',
998 'compliance_type': 'NOT_APPLICABLE'}
1000 if evaluation is None:
1001 resources = super(ConfigRuleMode, self).run(event, lambda_context)
1002 match = self.policy.data['mode'].get('match-compliant', False)
1003 self.policy.log.info(
1004 "found resources:%d match-compliant:%s", len(resources or ()), match)
1005 if (match and resources) or (not match and not resources):
1006 evaluation = {
1007 'compliance_type': 'COMPLIANT',
1008 'annotation': 'The resource is compliant with policy:%s.' % (
1009 self.policy.name)}
1010 else:
1011 evaluation = {
1012 'compliance_type': 'NON_COMPLIANT',
1013 'annotation': 'Resource is not compliant with policy:%s' % (
1014 self.policy.name)
1015 }
1017 client = utils.local_session(
1018 self.policy.session_factory).client('config')
1019 client.put_evaluations(
1020 Evaluations=[{
1021 'ComplianceResourceType': cfg_item['resourceType'],
1022 'ComplianceResourceId': cfg_item['resourceId'],
1023 'ComplianceType': evaluation['compliance_type'],
1024 'Annotation': evaluation['annotation'],
1025 # TODO ? if not applicable use current timestamp
1026 'OrderingTimestamp': cfg_item[
1027 'configurationItemCaptureTime']}],
1028 ResultToken=event.get('resultToken', 'No token found.'))
1029 return resources
1032def get_session_factory(provider_name, options):
1033 try:
1034 return clouds[provider_name]().get_session_factory(options)
1035 except KeyError:
1036 raise RuntimeError(
1037 "%s provider not installed" % provider_name)
1040class PolicyConditionAnd(And):
1041 def get_resource_type_id(self):
1042 return 'name'
1045class PolicyConditionOr(Or):
1046 def get_resource_type_id(self):
1047 return 'name'
1050class PolicyConditionNot(Not):
1051 def get_resource_type_id(self):
1052 return 'name'
1055class PolicyConditions:
1057 filter_registry = FilterRegistry('c7n.policy.filters')
1058 filter_registry.register('and', PolicyConditionAnd)
1059 filter_registry.register('or', PolicyConditionOr)
1060 filter_registry.register('not', PolicyConditionNot)
1062 def __init__(self, policy, data):
1063 self.policy = policy
1064 # for value_from usage / we use the conditions class
1065 # to mimic a resource manager interface. we can't use
1066 # the actual resource manager as we're overriding block
1067 # filters which work w/ resource type metadata and our
1068 # resource here is effectively the execution variables.
1069 self.config = self.policy.options
1070 rm = self.policy.resource_manager
1071 self._cache = rm._cache
1072 self.session_factory = rm.session_factory
1073 # used by c7n-org to extend evaluation conditions
1074 self.env_vars = {}
1075 self.update(data)
1077 def update(self, data):
1078 self.data = data
1079 self.filters = self.data.get('conditions', [])
1080 self.initialized = False
1082 def validate(self):
1083 if not self.initialized:
1084 self.filters.extend(self.convert_deprecated())
1085 self.filters = self.filter_registry.parse(self.filters, self)
1086 self.initialized = True
1088 def evaluate(self, event=None):
1089 policy_vars = dict(self.env_vars)
1090 policy_vars.update({
1091 'name': self.policy.name,
1092 'region': self.policy.options.region,
1093 'resource': self.policy.resource_type,
1094 'provider': self.policy.provider_name,
1095 'account_id': self.policy.options.account_id,
1096 'now': datetime.now(tzutil.tzutc()),
1097 'policy': self.policy.data
1098 })
1100 # note for no filters/conditions, this uses all([]) == true property.
1101 state = all([f.process([policy_vars], event) for f in self.filters])
1102 if not state:
1103 self.policy.log.info(
1104 'Skipping policy:%s due to execution conditions', self.policy.name)
1105 return state
1107 def iter_filters(self, block_end=False):
1108 return iter_filters(self.filters, block_end=block_end)
1110 def convert_deprecated(self):
1111 """These deprecated attributes are now recorded as deprecated against the policy."""
1112 filters = []
1113 if 'region' in self.policy.data:
1114 filters.append({'region': self.policy.data['region']})
1115 if 'start' in self.policy.data:
1116 filters.append({
1117 'type': 'value',
1118 'key': 'now',
1119 'value_type': 'date',
1120 'op': 'gte',
1121 'value': self.policy.data['start']})
1122 if 'end' in self.policy.data:
1123 filters.append({
1124 'type': 'value',
1125 'key': 'now',
1126 'value_type': 'date',
1127 'op': 'lte',
1128 'value': self.policy.data['end']})
1129 return filters
1131 def get_deprecations(self):
1132 """Return any matching deprecations for the policy fields itself."""
1133 deprecations = []
1134 for f in self.filters:
1135 deprecations.extend(f.get_deprecations())
1136 return deprecations
1139class Policy:
1141 log = logging.getLogger('custodian.policy')
1143 deprecations = (
1144 deprecated.field('region', 'region in condition block'),
1145 deprecated.field('start', 'value filter in condition block'),
1146 deprecated.field('end', 'value filter in condition block'),
1147 )
1149 def __init__(self, data, options, session_factory=None):
1150 self.data = data
1151 self.options = options
1152 assert "name" in self.data
1153 if session_factory is None:
1154 session_factory = get_session_factory(self.provider_name, options)
1155 self.session_factory = session_factory
1156 self.ctx = ExecutionContext(self.session_factory, self, self.options)
1157 self.resource_manager = self.load_resource_manager()
1158 self.conditions = PolicyConditions(self, data)
1160 def __repr__(self):
1161 return "<Policy resource:%s name:%s region:%s>" % (
1162 self.resource_type, self.name, self.options.region)
1164 @property
1165 def name(self) -> str:
1166 return self.data['name']
1168 @property
1169 def description(self) -> str:
1170 return self.data.get('description', '')
1172 @property
1173 def resource_type(self) -> str:
1174 return self.data['resource']
1176 @property
1177 def provider_name(self) -> str:
1178 return get_policy_provider(self.data)
1180 def is_runnable(self, event=None):
1181 return self.conditions.evaluate(event)
1183 # Runtime circuit breakers
1184 @property
1185 def max_resources(self):
1186 return self.data.get('max-resources')
1188 @property
1189 def max_resources_percent(self):
1190 return self.data.get('max-resources-percent')
1192 @property
1193 def tags(self):
1194 return self.data.get('tags', ())
1196 def get_cache(self):
1197 return self.resource_manager._cache
1199 @property
1200 def execution_mode(self):
1201 return self.data.get('mode', {'type': 'pull'})['type']
1203 def get_execution_mode(self):
1204 try:
1205 exec_mode = execution[self.execution_mode]
1206 except KeyError:
1207 return None
1208 return exec_mode(self)
1210 @property
1211 def is_lambda(self):
1212 if 'mode' not in self.data:
1213 return False
1214 return True
1216 def validate(self):
1217 self.conditions.validate()
1218 m = self.get_execution_mode()
1219 if m is None:
1220 raise PolicyValidationError(
1221 "Invalid Execution mode in policy %s" % (self.data,))
1222 m.validate()
1223 self.validate_policy_start_stop()
1224 self.resource_manager.validate()
1225 for f in self.resource_manager.filters:
1226 f.validate()
1227 for a in self.resource_manager.actions:
1228 a.validate()
1230 def get_variables(self, variables=None):
1231 """Get runtime variables for policy interpolation.
1233 Runtime variables are merged with the passed in variables
1234 if any.
1235 """
1236 # Global policy variable expansion, we have to carry forward on
1237 # various filter/action local vocabularies. Where possible defer
1238 # by using a format string.
1239 #
1240 # See https://github.com/cloud-custodian/cloud-custodian/issues/2330
1241 if not variables:
1242 variables = {}
1244 partition = utils.get_partition(self.options.region)
1245 if 'mode' in self.data:
1246 if 'role' in self.data['mode'] and not self.data['mode']['role'].startswith("arn:aws"):
1247 self.data['mode']['role'] = "arn:%s:iam::%s:role/%s" % \
1248 (partition, self.options.account_id, self.data['mode']['role'])
1250 variables.update({
1251 # standard runtime variables for interpolation
1252 'account': '{account}',
1253 'account_id': self.options.account_id,
1254 'partition': partition,
1255 'region': self.options.region,
1256 # non-standard runtime variables from local filter/action vocabularies
1257 #
1258 # notify action
1259 'policy': self.data,
1260 'event': '{event}',
1261 # mark for op action
1262 'op': '{op}',
1263 'action_date': '{action_date}',
1264 # tag action pyformat-date handling
1265 # defer expansion until runtime for serverless modes
1266 'now': (
1267 utils.DeferredFormatString('now')
1268 if isinstance(self.get_execution_mode(), ServerlessExecutionMode)
1269 else utils.FormatDate(datetime.utcnow())
1270 ),
1271 # account increase limit action
1272 'service': '{service}',
1273 # s3 set logging action :-( see if we can revisit this one.
1274 'bucket_region': '{bucket_region}',
1275 'bucket_name': '{bucket_name}',
1276 'source_bucket_name': '{source_bucket_name}',
1277 'source_bucket_region': '{source_bucket_region}',
1278 'target_bucket_name': '{target_bucket_name}',
1279 'target_prefix': '{target_prefix}',
1280 'LoadBalancerName': '{LoadBalancerName}'
1281 })
1282 return variables
1284 def expand_variables(self, variables):
1285 """Expand variables in policy data.
1287 Updates the policy data in-place.
1288 """
1289 # format string values returns a copy
1290 var_fmt = VarFormat()
1291 updated = utils.format_string_values(
1292 self.data, formatter=var_fmt.format, **variables)
1294 # Several keys should only be expanded at runtime, perserve them.
1295 if 'member-role' in updated.get('mode', {}):
1296 updated['mode']['member-role'] = self.data['mode']['member-role']
1298 # Update ourselves in place
1299 self.data = updated
1301 # NOTE update the policy conditions base on the new self.data
1302 self.conditions.update(self.data)
1304 # Reload filters/actions using updated data, we keep a reference
1305 # for some compatiblity preservation work.
1306 m = self.resource_manager
1307 self.resource_manager = self.load_resource_manager()
1309 # XXX: Compatiblity hack
1310 # Preserve notify action subject lines which support
1311 # embedded jinja2 as a passthrough to the mailer.
1312 for old_a, new_a in zip(m.actions, self.resource_manager.actions):
1313 if old_a.type == 'notify' and 'subject' in old_a.data:
1314 new_a.data['subject'] = old_a.data['subject']
1316 def push(self, event, lambda_ctx=None):
1317 mode = self.get_execution_mode()
1318 return mode.run(event, lambda_ctx)
1320 def provision(self):
1321 """Provision policy as a lambda function."""
1322 mode = self.get_execution_mode()
1323 return mode.provision()
1325 def poll(self):
1326 """Query resources and apply policy."""
1327 mode = self.get_execution_mode()
1328 return mode.run()
1330 def get_permissions(self):
1331 """get permissions needed by this policy"""
1332 permissions = set()
1333 permissions.update(self.resource_manager.get_permissions())
1334 for f in self.resource_manager.filters:
1335 permissions.update(f.get_permissions())
1336 for a in self.resource_manager.actions:
1337 permissions.update(a.get_permissions())
1338 return permissions
1340 def _trim_runtime_filters(self):
1341 from c7n.filters.core import trim_runtime
1342 trim_runtime(self.conditions.filters)
1343 trim_runtime(self.resource_manager.filters)
1345 def __call__(self):
1346 """Run policy in default mode"""
1347 mode = self.get_execution_mode()
1348 if (isinstance(mode, ServerlessExecutionMode) or
1349 self.options.dryrun):
1350 self._trim_runtime_filters()
1352 if self.options.dryrun:
1353 resources = PullMode(self).run()
1354 elif not self.is_runnable():
1355 resources = []
1356 elif isinstance(mode, ServerlessExecutionMode):
1357 resources = mode.provision()
1358 else:
1359 resources = mode.run()
1361 return resources
1363 run = __call__
1365 def _write_file(self, rel_path, value):
1366 """This method is no longer called within c7n, and despite being a private
1367 method, caution is taken here to not break any external callers.
1368 """
1369 log.warning("policy _write_file is deprecated, use ctx.output.write_file")
1370 self.ctx.output.write_file(rel_path, value)
1372 def load_resource_manager(self):
1373 factory = get_resource_class(self.data.get('resource'))
1374 return factory(self.ctx, self.data)
1376 def validate_policy_start_stop(self):
1377 policy_name = self.data.get('name')
1378 policy_tz = self.data.get('tz')
1379 policy_start = self.data.get('start')
1380 policy_end = self.data.get('end')
1382 if policy_tz:
1383 try:
1384 p_tz = tzutil.gettz(policy_tz)
1385 except Exception as e:
1386 raise PolicyValidationError(
1387 "Policy: %s TZ not parsable: %s, %s" % (
1388 policy_name, policy_tz, e))
1390 # Type will be tzwin on windows, but tzwin is null on linux
1391 if not (isinstance(p_tz, tzutil.tzfile) or
1392 (tzutil.tzwin and isinstance(p_tz, tzutil.tzwin))):
1393 raise PolicyValidationError(
1394 "Policy: %s TZ not parsable: %s" % (
1395 policy_name, policy_tz))
1397 for i in [policy_start, policy_end]:
1398 if i:
1399 try:
1400 parser.parse(i)
1401 except Exception as e:
1402 raise ValueError(
1403 "Policy: %s Date/Time not parsable: %s, %s" % (policy_name, i, e))
1405 def get_deprecations(self):
1406 """Return any matching deprecations for the policy fields itself."""
1407 return deprecated.check_deprecations(self, "policy")