Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/policy.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3from datetime import datetime
4import json
5import fnmatch
6import itertools
7import logging
8import os
9import time
10from typing import List
12from dateutil import parser, tz as tzutil
14from c7n.cwe import CloudWatchEvents
15from c7n.ctx import ExecutionContext
16from c7n.exceptions import PolicyValidationError, ClientError, ResourceLimitExceeded
17from c7n.filters import FilterRegistry, And, Or, Not
18from c7n.manager import iter_filters
19from c7n.output import DEFAULT_NAMESPACE
20from c7n.resources import load_resources
21from c7n.registry import PluginRegistry
22from c7n.provider import clouds, get_resource_class
23from c7n import deprecated, utils
24from c7n.version import version
25from c7n.query import RetryPageIterator
26from c7n.varfmt import VarFormat
27from c7n.utils import get_policy_provider, jmespath_compile
29log = logging.getLogger('c7n.policy')
32def load(options, path, format=None, validate=True, vars=None):
33 # should we do os.path.expanduser here?
34 if not os.path.exists(path):
35 raise IOError("Invalid path for config %r" % path)
37 from c7n.schema import validate as schema_validate, StructureParser
38 if os.path.isdir(path):
39 from c7n.loader import DirectoryLoader
40 collection = DirectoryLoader(options).load_directory(path, validate)
41 if validate:
42 [p.validate() for p in collection]
43 return collection
45 if os.path.isfile(path):
46 data = utils.load_file(path, format=format, vars=vars)
48 structure = StructureParser()
49 structure.validate(data)
50 rtypes = structure.get_resource_types(data)
51 load_resources(rtypes)
53 if isinstance(data, list):
54 log.warning('yaml in invalid format. The "policies:" line is probably missing.')
55 return None
57 if validate:
58 errors = schema_validate(data, resource_types=rtypes)
59 if errors:
60 raise PolicyValidationError(
61 "Failed to validate policy %s \n %s" % (
62 errors[1], errors[0]))
64 # Test for empty policy file
65 if not data or data.get('policies') is None:
66 return None
68 collection = PolicyCollection.from_data(data, options)
69 if validate:
70 # non schema validation of policies
71 [p.validate() for p in collection]
72 return collection
75class PolicyCollection:
77 log = logging.getLogger('c7n.policies')
79 def __init__(self, policies: 'List[Policy]', options):
80 self.options = options
81 self.policies = policies
83 @classmethod
84 def from_data(cls, data: dict, options, session_factory=None):
85 # session factory param introduction needs an audit and review
86 # on tests.
87 sf = session_factory if session_factory else cls.session_factory()
88 policies = [Policy(p, options, session_factory=sf)
89 for p in data.get('policies', ())]
90 return cls(policies, options)
92 def __add__(self, other):
93 return self.__class__(self.policies + other.policies, self.options)
95 def filter(self, policy_patterns=(), resource_types=(), modes=()):
96 results = self.policies
97 results = self._filter_by_patterns(results, policy_patterns)
98 results = self._filter_by_resource_types(results, resource_types)
99 results = self._filter_by_modes(results, modes)
100 # next line brings the result set in the original order of self.policies
101 results = [x for x in self.policies if x in results]
102 return PolicyCollection(results, self.options)
104 def _filter_by_patterns(self, policies, patterns):
105 """
106 Takes a list of policies and returns only those matching the given glob
107 patterns
108 """
109 if not patterns:
110 return policies
112 results = []
113 for pattern in patterns:
114 result = self._filter_by_pattern(policies, pattern)
115 results.extend(x for x in result if x not in results)
116 return results
118 def _filter_by_pattern(self, policies, pattern):
119 """
120 Takes a list of policies and returns only those matching the given glob
121 pattern
122 """
123 results = []
124 for policy in policies:
125 if fnmatch.fnmatch(policy.name, pattern):
126 results.append(policy)
128 if not results:
129 self.log.warning((
130 'Policy pattern "{}" '
131 'did not match any policies.').format(pattern))
133 return results
135 def _filter_by_resource_types(self, policies, resource_types):
136 """
137 Takes a list of policies and returns only those matching the given
138 resource types
139 """
140 if not resource_types:
141 return policies
143 results = []
144 for resource_type in resource_types:
145 result = self._filter_by_resource_type(policies, resource_type)
146 results.extend(x for x in result if x not in results)
147 return results
149 def _filter_by_resource_type(self, policies, resource_type):
150 """
151 Takes a list policies and returns only those matching the given resource
152 type
153 """
154 results = []
155 for policy in policies:
156 if policy.resource_type == resource_type:
157 results.append(policy)
159 if not results:
160 self.log.warning((
161 'Resource type "{}" '
162 'did not match any policies.').format(resource_type))
164 return results
166 def _filter_by_modes(self, policies, modes):
167 """
168 Takes a list of policies and returns only those matching a given mode
169 """
170 if not modes:
171 return policies
172 results = []
173 for mode in modes:
174 result = self._filter_by_mode(policies, mode)
175 results.extend(x for x in result if x not in results)
176 return results
178 def _filter_by_mode(self, policies, mode):
179 """
180 Takes a list of policies and returns only those matching a given mode
181 """
182 results = []
183 for policy in policies:
184 if policy.get_execution_mode().type == mode:
185 results.append(policy)
186 if not results:
187 self.log.warning((
188 'Filter by modes type "{}" '
189 'did not match any policies.').format(mode))
190 return results
192 def __iter__(self):
193 return iter(self.policies)
195 def __contains__(self, policy_name):
196 for p in self.policies:
197 if p.name == policy_name:
198 return True
199 return False
201 def __len__(self):
202 return len(self.policies)
204 @property
205 def resource_types(self):
206 """resource types used by the collection."""
207 rtypes = set()
208 for p in self.policies:
209 rtypes.add(p.resource_type)
210 return rtypes
212 # cli/collection tests patch this
213 @classmethod
214 def session_factory(cls):
215 return None
218class PolicyExecutionMode:
219 """Policy execution semantics"""
221 POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime')
222 permissions = ()
224 def __init__(self, policy):
225 self.policy = policy
227 def run(self, event=None, lambda_context=None):
228 """Run the actual policy."""
229 raise NotImplementedError("subclass responsibility")
231 def provision(self):
232 """Provision any resources needed for the policy."""
234 def get_logs(self, start, end):
235 """Retrieve logs for the policy"""
236 raise NotImplementedError("subclass responsibility")
238 def validate(self):
239 """Validate configuration settings for execution mode."""
241 def get_permissions(self):
242 return self.permissions
244 def get_metrics(self, start, end, period):
245 """Retrieve any associated metrics for the policy."""
246 values = {}
247 default_dimensions = {
248 'Policy': self.policy.name, 'ResType': self.policy.resource_type,
249 'Scope': 'Policy'}
251 metrics = list(self.POLICY_METRICS)
253 # Support action, and filter custom metrics
254 for el in itertools.chain(
255 self.policy.resource_manager.actions,
256 self.policy.resource_manager.filters):
257 if el.metrics:
258 metrics.extend(el.metrics)
260 session = utils.local_session(self.policy.session_factory)
261 client = session.client('cloudwatch')
263 for m in metrics:
264 if isinstance(m, str):
265 dimensions = default_dimensions
266 else:
267 m, m_dimensions = m
268 dimensions = dict(default_dimensions)
269 dimensions.update(m_dimensions)
270 results = client.get_metric_statistics(
271 Namespace=DEFAULT_NAMESPACE,
272 Dimensions=[
273 {'Name': k, 'Value': v} for k, v
274 in dimensions.items()],
275 Statistics=['Sum', 'Average'],
276 StartTime=start,
277 EndTime=end,
278 Period=period,
279 MetricName=m)
280 values[m] = results['Datapoints']
281 return values
283 def get_deprecations(self):
284 # The execution mode itself doesn't have a data dict, so we grab the
285 # mode part from the policy data dict itself.
286 return deprecated.check_deprecations(self, data=self.policy.data.get('mode', {}))
289class ServerlessExecutionMode(PolicyExecutionMode):
290 def run(self, event=None, lambda_context=None):
291 """Run the actual policy."""
292 raise NotImplementedError("subclass responsibility")
294 def get_logs(self, start, end):
295 """Retrieve logs for the policy"""
296 raise NotImplementedError("subclass responsibility")
298 def provision(self):
299 """Provision any resources needed for the policy."""
300 raise NotImplementedError("subclass responsibility")
303execution = PluginRegistry('c7n.execution')
306@execution.register('pull')
307class PullMode(PolicyExecutionMode):
308 """Pull mode execution of a policy.
310 Queries resources from cloud provider for filtering and actions.
311 """
313 schema = utils.type_schema('pull')
315 def run(self, *args, **kw):
316 if not self.policy.is_runnable():
317 return []
319 with self.policy.ctx as ctx:
320 self.policy.log.debug(
321 "Running policy:%s resource:%s region:%s c7n:%s",
322 self.policy.name,
323 self.policy.resource_type,
324 self.policy.options.region or 'default',
325 version,
326 )
328 s = time.time()
329 try:
330 resources = self.policy.resource_manager.resources()
331 except ResourceLimitExceeded as e:
332 self.policy.log.error(str(e))
333 ctx.metrics.put_metric(
334 'ResourceLimitExceeded', e.selection_count, "Count"
335 )
336 raise
338 rt = time.time() - s
339 self.policy.log.info(
340 "policy:%s resource:%s region:%s count:%d time:%0.2f",
341 self.policy.name,
342 self.policy.resource_type,
343 self.policy.options.region,
344 len(resources),
345 rt,
346 )
347 ctx.metrics.put_metric(
348 "ResourceCount", len(resources), "Count", Scope="Policy"
349 )
350 ctx.metrics.put_metric("ResourceTime", rt, "Seconds", Scope="Policy")
351 ctx.output.write_file('resources.json', utils.dumps(resources, indent=2))
353 if not resources:
354 return []
356 if self.policy.options.dryrun:
357 self.policy.log.debug("dryrun: skipping actions")
358 return resources
360 at = time.time()
361 for a in self.policy.resource_manager.actions:
362 s = time.time()
363 with ctx.tracer.subsegment('action:%s' % a.type):
364 results = a.process(resources)
365 self.policy.log.info(
366 "policy:%s action:%s"
367 " resources:%d"
368 " execution_time:%0.2f"
369 % (self.policy.name, a.name, len(resources), time.time() - s)
370 )
371 if results:
372 ctx.output.write_file("action-%s" % a.name, utils.dumps(results))
373 ctx.metrics.put_metric(
374 "ActionTime", time.time() - at, "Seconds", Scope="Policy"
375 )
376 return resources
379class LambdaMode(ServerlessExecutionMode):
380 """A policy that runs/executes in lambda."""
382 POLICY_METRICS = ('ResourceCount',)
384 schema = {
385 'type': 'object',
386 'additionalProperties': False,
387 'properties': {
388 'execution-options': {'type': 'object'},
389 'function-prefix': {'type': 'string'},
390 'member-role': {'type': 'string'},
391 'packages': {'type': 'array', 'items': {'type': 'string'}},
392 # Lambda passthrough config
393 'layers': {'type': 'array', 'items': {'type': 'string'}},
394 'concurrency': {'type': 'integer'},
395 'runtime': {'enum': ['python3.8', 'python3.9', 'python3.10',
396 'python3.11', 'python3.12']},
397 'role': {'type': 'string'},
398 'handler': {'type': 'string'},
399 'pattern': {'type': 'object', 'minProperties': 1},
400 'timeout': {'type': 'number'},
401 'memory': {'type': 'number'},
402 'environment': {'type': 'object'},
403 'tags': {'type': 'object'},
404 'dead_letter_config': {'type': 'object'},
405 'kms_key_arn': {'type': 'string'},
406 'tracing_config': {'type': 'object'},
407 'security_groups': {'type': 'array'},
408 'subnets': {'type': 'array'}
409 }
410 }
412 def validate(self):
413 super(LambdaMode, self).validate()
414 prefix = self.policy.data['mode'].get('function-prefix', 'custodian-')
415 MAX_LAMBDA_FUNCTION_NAME_LENGTH = 64
416 if len(prefix + self.policy.name) > MAX_LAMBDA_FUNCTION_NAME_LENGTH:
417 raise PolicyValidationError(
418 "Custodian Lambda policies have a max length with prefix of %s"
419 " policy:%s prefix:%s" % (
420 MAX_LAMBDA_FUNCTION_NAME_LENGTH,
421 self.policy.name,
422 prefix
423 )
424 )
425 MAX_LAMBDA_FUNCTION_DESCRIPTION_LENGTH = 256
426 if len(self.policy.description) > MAX_LAMBDA_FUNCTION_DESCRIPTION_LENGTH:
427 raise PolicyValidationError(
428 'Custodian Lambda policies have a max description length of %s'
429 ' policy: %s description: %s' % (
430 MAX_LAMBDA_FUNCTION_DESCRIPTION_LENGTH,
431 self.policy.name,
432 self.policy.description
433 )
434 )
435 tags = self.policy.data['mode'].get('tags')
436 if not tags:
437 return
438 reserved_overlap = [t for t in tags if t.startswith('custodian-')]
439 if reserved_overlap:
440 log.warning((
441 'Custodian reserves policy lambda '
442 'tags starting with custodian - policy specifies %s' % (
443 ', '.join(reserved_overlap))))
445 def get_member_account_id(self, event):
446 return event.get('account')
448 def get_member_region(self, event):
449 return event.get('region')
451 def assume_member(self, event):
452 # if a member role is defined we're being run out of the master, and we need
453 # to assume back into the member for policy execution.
454 member_role = self.policy.data['mode'].get('member-role')
455 member_id = self.get_member_account_id(event)
456 region = self.get_member_region(event)
457 if member_role and member_id and region:
458 # In the master account we might be multiplexing a hot lambda across
459 # multiple member accounts for each event/invocation.
460 member_role = member_role.format(account_id=member_id)
461 utils.reset_session_cache()
462 self.policy.options['account_id'] = member_id
463 self.policy.options['region'] = region
464 self.policy.session_factory.region = region
465 self.policy.session_factory.assume_role = member_role
466 self.policy.log.info(
467 "Assuming member role:%s", member_role)
468 return True
469 return False
471 def resolve_resources(self, event):
472 self.assume_member(event)
473 mode = self.policy.data.get('mode', {})
474 resource_ids = CloudWatchEvents.get_ids(event, mode)
475 if resource_ids is None:
476 raise ValueError("Unknown push event mode %s", self.data)
477 self.policy.log.info('Found resource ids:%s', resource_ids)
478 # Handle multi-resource type events, like ec2 CreateTags
479 resource_ids = self.policy.resource_manager.match_ids(resource_ids)
480 if not resource_ids:
481 self.policy.log.warning("Could not find resource ids")
482 return []
484 resources = self.policy.resource_manager.get_resources(resource_ids)
485 if 'debug' in event:
486 self.policy.log.info("Resources %s", resources)
487 return resources
489 def run(self, event, lambda_context):
490 """Run policy in push mode against given event.
492 Lambda automatically generates cloud watch logs, and metrics
493 for us, albeit with some deficienies, metrics no longer count
494 against valid resources matches, but against execution.
496 If metrics execution option is enabled, custodian will generate
497 metrics per normal.
498 """
499 self.setup_exec_environment(event)
500 if not self.policy.is_runnable(event):
501 return
502 resources = self.resolve_resources(event)
503 if not resources:
504 return resources
505 rcount = len(resources)
506 resources = self.policy.resource_manager.filter_resources(
507 resources, event)
509 if 'debug' in event:
510 self.policy.log.info(
511 "Filtered resources %d of %d", len(resources), rcount)
513 if not resources:
514 self.policy.log.info(
515 "policy:%s resources:%s no resources matched" % (
516 self.policy.name, self.policy.resource_type))
517 return
518 return self.run_resource_set(event, resources)
520 def setup_exec_environment(self, event):
521 mode = self.policy.data.get('mode', {})
522 if not bool(mode.get("log", True)):
523 root = logging.getLogger()
524 map(root.removeHandler, root.handlers[:])
525 root.handlers = [logging.NullHandler()]
527 def run_resource_set(self, event, resources):
528 from c7n.actions import EventAction
530 with self.policy.ctx as ctx:
531 ctx.metrics.put_metric(
532 'ResourceCount', len(resources), 'Count', Scope="Policy", buffer=False
533 )
535 if 'debug' in event:
536 self.policy.log.info(
537 "Invoking actions %s", self.policy.resource_manager.actions
538 )
540 ctx.output.write_file('resources.json', utils.dumps(resources, indent=2))
542 for action in self.policy.resource_manager.actions:
543 self.policy.log.info(
544 "policy:%s invoking action:%s resources:%d",
545 self.policy.name,
546 action.name,
547 len(resources),
548 )
549 if isinstance(action, EventAction):
550 results = action.process(resources, event)
551 else:
552 results = action.process(resources)
553 ctx.output.write_file("action-%s" % action.name, utils.dumps(results))
554 return resources
556 @property
557 def policy_lambda(self):
558 from c7n import mu
559 return mu.PolicyLambda
561 def provision(self):
562 # auto tag lambda policies with mode and version, we use the
563 # version in mugc to effect cleanups.
564 tags = self.policy.data['mode'].setdefault('tags', {})
565 tags['custodian-info'] = "mode=%s:version=%s" % (
566 self.policy.data['mode']['type'], version)
567 # auto tag with schedule name and group to link function to
568 # EventBridge schedule when using schedule mode
569 if self.policy.data['mode']['type'] == 'schedule':
570 prefix = self.policy.data['mode'].get('function-prefix', 'custodian-')
571 name = self.policy.data['name']
572 group = self.policy.data['mode'].get('group-name', 'default')
573 tags['custodian-schedule'] = f'name={prefix + name}:group={group}'
575 from c7n import mu
576 with self.policy.ctx:
577 self.policy.log.info(
578 "Provisioning policy lambda: %s region: %s", self.policy.name,
579 self.policy.options.region)
580 try:
581 manager = mu.LambdaManager(self.policy.session_factory)
582 except ClientError:
583 # For cli usage by normal users, don't assume the role just use
584 # it for the lambda
585 manager = mu.LambdaManager(
586 lambda assume=False: self.policy.session_factory(assume))
587 return manager.publish(
588 self.policy_lambda(self.policy),
589 role=self.policy.options.assume_role)
592@execution.register('periodic')
593class PeriodicMode(LambdaMode, PullMode):
594 """A policy that runs in pull mode within lambda.
596 Runs Custodian in AWS lambda at user defined cron interval using EventBridge rules.
597 """
599 POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime')
601 schema = utils.type_schema(
602 'periodic', schedule={'type': 'string'}, rinherit=LambdaMode.schema)
604 def run(self, event, lambda_context):
605 return PullMode.run(self)
608@execution.register('schedule')
609class ScheduleMode(LambdaMode, PullMode):
610 """A policy that runs in pull mode within lambda.
612 Runs Custodian in AWS lambda at user defined cron interval using EventBridge Scheduler.
613 """
615 POLICY_METRICS = ('ResourceCount', 'ResourceTime', 'ActionTime')
617 schema = utils.type_schema(
618 'schedule',
619 schedule={'type': 'string'},
620 timezone={'type': 'string'},
621 **{'start-date': {'type': 'string'},
622 'end-date': {'type': 'string'},
623 'scheduler-role': {'type': 'string'},
624 'group-name': {'type': 'string'}},
625 required=['schedule'],
626 rinherit=LambdaMode.schema)
628 def run(self, event, lambda_context):
629 return PullMode.run(self)
632@execution.register('phd')
633class PHDMode(LambdaMode):
634 """Personal Health Dashboard event based policy execution.
636 PHD events are triggered by changes in the operations health of
637 AWS services and data center resources,
639 See `Personal Health Dashboard
640 <https://aws.amazon.com/premiumsupport/technology/personal-health-dashboard/>`_
641 for more details.
642 """
644 schema = utils.type_schema(
645 'phd',
646 events={'type': 'array', 'items': {'type': 'string'}},
647 categories={'type': 'array', 'items': {
648 'enum': ['issue', 'accountNotification', 'scheduledChange']}},
649 statuses={'type': 'array', 'items': {
650 'enum': ['open', 'upcoming', 'closed']}},
651 rinherit=LambdaMode.schema)
653 def validate(self):
654 super(PHDMode, self).validate()
655 if self.policy.resource_type == 'account':
656 return
657 if 'health-event' not in self.policy.resource_manager.filter_registry:
658 raise PolicyValidationError(
659 "policy:%s phd event mode not supported for resource:%s" % (
660 self.policy.name, self.policy.resource_type))
661 if 'events' not in self.policy.data['mode']:
662 raise PolicyValidationError(
663 'policy:%s phd event mode requires events for resource:%s' % (
664 self.policy.name, self.policy.resource_type))
666 @staticmethod
667 def process_event_arns(client, event_arns):
668 entities = []
669 paginator = client.get_paginator('describe_affected_entities')
670 for event_set in utils.chunks(event_arns, 10):
671 # Note: we aren't using event_set here, just event_arns.
672 entities.extend(list(itertools.chain(
673 *[p['entities'] for p in paginator.paginate(
674 filter={'eventArns': event_arns})])))
675 return entities
677 def resolve_resources(self, event):
678 session = utils.local_session(self.policy.resource_manager.session_factory)
679 health = session.client('health', region_name='us-east-1')
680 he_arn = event['detail']['eventArn']
681 resource_arns = self.process_event_arns(health, [he_arn])
683 m = self.policy.resource_manager.get_model()
684 if 'arn' in m.id.lower():
685 resource_ids = [r['entityValue'].rsplit('/', 1)[-1] for r in resource_arns]
686 else:
687 resource_ids = [r['entityValue'] for r in resource_arns]
689 resources = self.policy.resource_manager.get_resources(resource_ids)
690 for r in resources:
691 r.setdefault('c7n:HealthEvent', []).append(he_arn)
692 return resources
695@execution.register('cloudtrail')
696class CloudTrailMode(LambdaMode):
697 """A lambda policy using cloudwatch events rules on cloudtrail api logs."""
699 schema = utils.type_schema(
700 'cloudtrail',
701 delay={'type': 'integer',
702 'description': 'sleep for delay seconds before processing an event'},
703 events={'type': 'array', 'items': {
704 'oneOf': [
705 {'type': 'string'},
706 {'type': 'object',
707 'required': ['event', 'source', 'ids'],
708 'properties': {
709 'source': {'type': 'string'},
710 'ids': {'type': 'string'},
711 'event': {'type': 'string'}}}]
712 }},
713 rinherit=LambdaMode.schema)
715 def validate(self):
716 super(CloudTrailMode, self).validate()
717 from c7n import query
718 events = self.policy.data['mode'].get('events')
719 assert events, "cloud trail mode requires specifiying events to subscribe"
720 for e in events:
721 if isinstance(e, str):
722 assert e in CloudWatchEvents.trail_events, "event shortcut not defined: %s" % e
723 if isinstance(e, dict):
724 jmespath_compile(e['ids'])
725 if isinstance(self.policy.resource_manager, query.ChildResourceManager):
726 if not getattr(self.policy.resource_manager.resource_type,
727 'supports_trailevents', False):
728 raise ValueError(
729 "resource:%s does not support cloudtrail mode policies" % (
730 self.policy.resource_type))
732 def resolve_resources(self, event):
733 # override to enable delay before fetching resources
734 delay = self.policy.data.get('mode', {}).get('delay')
735 if delay:
736 time.sleep(delay)
737 return super().resolve_resources(event)
740@execution.register('ec2-instance-state')
741class EC2InstanceState(LambdaMode):
742 """
743 A lambda policy that executes on ec2 instance state changes.
745 See `EC2 lifecycles
746 <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html>`_
747 for more details.
748 """
750 schema = utils.type_schema(
751 'ec2-instance-state', rinherit=LambdaMode.schema,
752 events={'type': 'array', 'items': {
753 'enum': ['pending', 'running', 'shutting-down',
754 'stopped', 'stopping', 'terminated']}})
757@execution.register('asg-instance-state')
758class ASGInstanceState(LambdaMode):
759 """a lambda policy that executes on an asg's ec2 instance state changes.
761 See `ASG Events
762 <https://docs.aws.amazon.com/autoscaling/ec2/userguide/cloud-watch-events.html>`_
763 for more details.
764 """
766 schema = utils.type_schema(
767 'asg-instance-state', rinherit=LambdaMode.schema,
768 events={'type': 'array', 'items': {
769 'enum': ['launch-success', 'launch-failure',
770 'terminate-success', 'terminate-failure']}})
773@execution.register('guard-duty')
774class GuardDutyMode(LambdaMode):
775 """Incident Response for AWS Guard Duty.
777 AWS Guard Duty is a threat detection service that continuously
778 monitors for malicious activity and unauthorized behavior. This
779 mode allows you to execute polcies when various alerts are created
780 by AWS Guard Duty for automated incident response. See `Guard Duty
781 <https://aws.amazon.com/guardduty/>`_ for more details.
782 """
784 schema = utils.type_schema('guard-duty', rinherit=LambdaMode.schema)
786 supported_resources = ('account', 'ec2', 'iam-user')
788 id_exprs = {
789 'account': jmespath_compile('detail.accountId'),
790 'ec2': jmespath_compile('detail.resource.instanceDetails.instanceId'),
791 'iam-user': jmespath_compile('detail.resource.accessKeyDetails.userName')}
793 def get_member_account_id(self, event):
794 return event['detail']['accountId']
796 def resolve_resources(self, event):
797 self.assume_member(event)
798 rid = self.id_exprs[self.policy.resource_type].search(event)
799 resources = self.policy.resource_manager.get_resources([rid])
800 # For iam users annotate with the access key specified in the finding event
801 if resources and self.policy.resource_type == 'iam-user':
802 resources[0]['c7n:AccessKeys'] = {
803 'AccessKeyId': event['detail']['resource']['accessKeyDetails']['accessKeyId']}
804 return resources
806 def validate(self):
807 super(GuardDutyMode, self).validate()
808 if self.policy.data['resource'] not in self.supported_resources:
809 raise ValueError(
810 "Policy:%s resource:%s Guard duty mode only supported for %s" % (
811 self.policy.data['name'],
812 self.policy.data['resource'],
813 self.supported_resources))
815 def provision(self):
816 if self.policy.data['resource'] == 'ec2':
817 self.policy.data['mode']['resource-filter'] = 'Instance'
818 elif self.policy.data['resource'] == 'iam-user':
819 self.policy.data['mode']['resource-filter'] = 'AccessKey'
820 return super(GuardDutyMode, self).provision()
823@execution.register('config-poll-rule')
824class ConfigPollRuleMode(LambdaMode, PullMode):
825 """This mode represents a periodic/scheduled AWS config evaluation.
827 The primary benefit this mode offers is to support additional resources
828 beyond what config supports natively, as it can post evaluations for
829 any resource which has a cloudformation type.
831 If a resource is natively supported by config it's highly recommended
832 to use a `config-rule` mode instead. Deployment will fail unless
833 the policy explicitly opts out of that check with `ignore-support-check`.
834 This can be useful in cases when a policy resource has native Config
835 support, but filters based on related resource attributes.
837 :example:
839 VPCs have native Config support, but flow logs are a separate resource.
840 This policy forces `config-poll-rule` mode to bypass the Config support
841 check and evaluate VPC compliance on a schedule.
843 .. code-block:: yaml
845 policies:
846 - name: vpc-flow-logs
847 resource: vpc
848 mode:
849 type: config-poll-rule
850 role: arn:aws:iam::{account_id}:role/MyRole
851 ignore-support-check: True
852 filters:
853 - not:
854 - type: flow-logs
855 destination-type: "s3"
856 enabled: True
857 status: active
858 traffic-type: all
859 destination: "arn:aws:s3:::mys3flowlogbucket"
861 This mode effectively receives no data from config, instead it's
862 periodically executed by config and polls and evaluates all
863 resources. It is equivalent to a periodic policy, except it also
864 pushes resource evaluations to config.
865 """
866 schema = utils.type_schema(
867 'config-poll-rule',
868 schedule={'enum': [
869 "One_Hour",
870 "Three_Hours",
871 "Six_Hours",
872 "Twelve_Hours",
873 "TwentyFour_Hours"]},
874 **{'ignore-support-check': {'type': 'boolean'}},
875 rinherit=LambdaMode.schema)
877 def validate(self):
878 super().validate()
879 if not self.policy.data['mode'].get('schedule'):
880 raise PolicyValidationError(
881 "policy:%s config-poll-rule schedule required" % (
882 self.policy.name))
883 if (
884 self.policy.resource_manager.resource_type.config_type
885 and not self.policy.data['mode'].get('ignore-support-check')
886 ):
887 raise PolicyValidationError(
888 "resource:%s fully supported by config and should use mode: config-rule" % (
889 self.policy.resource_type))
890 if self.policy.data['mode'].get('pattern'):
891 raise PolicyValidationError(
892 "policy:%s AWS Config does not support event pattern filtering" % (
893 self.policy.name))
894 if not self.policy.resource_manager.resource_type.cfn_type:
895 raise PolicyValidationError((
896 'policy:%s resource:%s does not have a cloudformation type'
897 ' and is there-fore not supported by config-poll-rule'))
899 @staticmethod
900 def get_obsolete_evaluations(client, cfg_rule_name, ordering_ts, evaluations):
901 """Get list of evaluations that are no longer applicable due to resources being deleted
902 """
903 latest_resource_ids = set()
904 for latest_eval in evaluations:
905 latest_resource_ids.add(latest_eval['ComplianceResourceId'])
907 obsolete_evaluations = []
908 paginator = client.get_paginator('get_compliance_details_by_config_rule')
909 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
910 old_evals = paginator.paginate(
911 ConfigRuleName=cfg_rule_name,
912 ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'],
913 PaginationConfig={'PageSize': 100}).build_full_result().get('EvaluationResults', [])
915 for old_eval in old_evals:
916 eval_res_qual = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']
917 old_resource_id = eval_res_qual['ResourceId']
918 if old_resource_id not in latest_resource_ids:
919 obsolete_evaluation = {
920 'ComplianceResourceType': eval_res_qual['ResourceType'],
921 'ComplianceResourceId': old_resource_id,
922 'Annotation': 'The rule does not apply.',
923 'ComplianceType': 'NOT_APPLICABLE',
924 'OrderingTimestamp': ordering_ts}
925 obsolete_evaluations.append(obsolete_evaluation)
926 return obsolete_evaluations
928 def _get_client(self):
929 return utils.local_session(
930 self.policy.session_factory).client('config')
932 def put_evaluations(self, client, token, evaluations):
933 for eval_set in utils.chunks(evaluations, 100):
934 self.policy.resource_manager.retry(
935 client.put_evaluations,
936 Evaluations=eval_set,
937 ResultToken=token)
939 def run(self, event, lambda_context):
940 cfg_event = json.loads(event['invokingEvent'])
941 resource_type = self.policy.resource_manager.resource_type.cfn_type
942 resource_id = self.policy.resource_manager.resource_type.config_id or \
943 self.policy.resource_manager.resource_type.id
944 client = self._get_client()
945 token = event.get('resultToken')
946 cfg_rule_name = event['configRuleName']
947 ordering_ts = cfg_event['notificationCreationTime']
948 policy_data = self.policy.data.copy()
949 policy_data.pop("filters", None)
951 matched_resources = set()
952 unmatched_resources = set()
953 for r in PullMode.run(self):
954 matched_resources.add(r[resource_id])
955 for r in self.policy.resource_manager.get_resource_manager(
956 self.policy.resource_type,
957 policy_data).resources():
958 if r[resource_id] not in matched_resources:
959 unmatched_resources.add(r[resource_id])
961 non_compliant_evals = [dict(
962 ComplianceResourceType=resource_type,
963 ComplianceResourceId=r,
964 ComplianceType='NON_COMPLIANT',
965 OrderingTimestamp=ordering_ts,
966 Annotation='The resource is not compliant with policy:%s.' % (
967 self.policy.name))
968 for r in matched_resources]
969 compliant_evals = [dict(
970 ComplianceResourceType=resource_type,
971 ComplianceResourceId=r,
972 ComplianceType='COMPLIANT',
973 OrderingTimestamp=ordering_ts,
974 Annotation='The resource is compliant with policy:%s.' % (
975 self.policy.name))
976 for r in unmatched_resources]
977 evaluations = non_compliant_evals + compliant_evals
978 obsolete_evaluations = self.get_obsolete_evaluations(
979 client, cfg_rule_name, ordering_ts, evaluations)
980 evaluations = evaluations + obsolete_evaluations
982 if evaluations and token:
983 self.put_evaluations(client, token, evaluations)
985 return list(matched_resources)
988@execution.register('config-rule')
989class ConfigRuleMode(LambdaMode):
990 """a lambda policy that executes as a config service rule.
992 The policy is invoked on configuration changes to resources.
994 See `AWS Config <https://aws.amazon.com/config/>`_ for more details.
995 """
996 cfg_event = None
997 schema = utils.type_schema('config-rule', rinherit=LambdaMode.schema)
999 def validate(self):
1000 super(ConfigRuleMode, self).validate()
1001 if not self.policy.resource_manager.resource_type.config_type:
1002 raise PolicyValidationError(
1003 "policy:%s AWS Config does not support resource-type:%s" % (
1004 self.policy.name, self.policy.resource_type))
1005 if self.policy.data['mode'].get('pattern'):
1006 raise PolicyValidationError(
1007 "policy:%s AWS Config does not support event pattern filtering" % (
1008 self.policy.name))
1010 def resolve_resources(self, event):
1011 source = self.policy.resource_manager.get_source('config')
1012 return [source.load_resource(self.cfg_event['configurationItem'])]
1014 def run(self, event, lambda_context):
1015 self.cfg_event = json.loads(event['invokingEvent'])
1016 cfg_item = self.cfg_event['configurationItem']
1017 evaluation = None
1018 resources = []
1020 # TODO config resource type matches policy check
1021 if event.get('eventLeftScope') or cfg_item['configurationItemStatus'] in (
1022 "ResourceDeleted",
1023 "ResourceNotRecorded",
1024 "ResourceDeletedNotRecorded"):
1025 evaluation = {
1026 'annotation': 'The rule does not apply.',
1027 'compliance_type': 'NOT_APPLICABLE'}
1029 if evaluation is None:
1030 resources = super(ConfigRuleMode, self).run(event, lambda_context)
1031 match = self.policy.data['mode'].get('match-compliant', False)
1032 self.policy.log.info(
1033 "found resources:%d match-compliant:%s", len(resources or ()), match)
1034 if (match and resources) or (not match and not resources):
1035 evaluation = {
1036 'compliance_type': 'COMPLIANT',
1037 'annotation': 'The resource is compliant with policy:%s.' % (
1038 self.policy.name)}
1039 else:
1040 evaluation = {
1041 'compliance_type': 'NON_COMPLIANT',
1042 'annotation': 'Resource is not compliant with policy:%s' % (
1043 self.policy.name)
1044 }
1046 client = utils.local_session(
1047 self.policy.session_factory).client('config')
1048 client.put_evaluations(
1049 Evaluations=[{
1050 'ComplianceResourceType': cfg_item['resourceType'],
1051 'ComplianceResourceId': cfg_item['resourceId'],
1052 'ComplianceType': evaluation['compliance_type'],
1053 'Annotation': evaluation['annotation'],
1054 # TODO ? if not applicable use current timestamp
1055 'OrderingTimestamp': cfg_item[
1056 'configurationItemCaptureTime']}],
1057 ResultToken=event.get('resultToken', 'No token found.'))
1058 return resources
1061def get_session_factory(provider_name, options):
1062 try:
1063 return clouds[provider_name]().get_session_factory(options)
1064 except KeyError:
1065 raise RuntimeError(
1066 "%s provider not installed" % provider_name)
1069class PolicyConditionAnd(And):
1070 def get_resource_type_id(self):
1071 return 'name'
1074class PolicyConditionOr(Or):
1075 def get_resource_type_id(self):
1076 return 'name'
1079class PolicyConditionNot(Not):
1080 def get_resource_type_id(self):
1081 return 'name'
1084class PolicyConditions:
1086 filter_registry = FilterRegistry('c7n.policy.filters')
1087 filter_registry.register('and', PolicyConditionAnd)
1088 filter_registry.register('or', PolicyConditionOr)
1089 filter_registry.register('not', PolicyConditionNot)
1091 def __init__(self, policy, data):
1092 self.policy = policy
1093 # for value_from usage / we use the conditions class
1094 # to mimic a resource manager interface. we can't use
1095 # the actual resource manager as we're overriding block
1096 # filters which work w/ resource type metadata and our
1097 # resource here is effectively the execution variables.
1098 self.config = self.policy.options
1099 rm = self.policy.resource_manager
1100 self._cache = rm._cache
1101 self.session_factory = rm.session_factory
1102 # used by c7n-org to extend evaluation conditions
1103 self.env_vars = {}
1104 self.update(data)
1106 def update(self, data):
1107 self.data = data
1108 self.filters = self.data.get('conditions', [])
1109 self.initialized = False
1111 def validate(self):
1112 if not self.initialized:
1113 self.filters.extend(self.convert_deprecated())
1114 self.filters = self.filter_registry.parse(self.filters, self)
1115 self.initialized = True
1117 def evaluate(self, event=None):
1118 policy_vars = dict(self.env_vars)
1119 policy_vars.update({
1120 'name': self.policy.name,
1121 'region': self.policy.options.region,
1122 'resource': self.policy.resource_type,
1123 'provider': self.policy.provider_name,
1124 'account_id': self.policy.options.account_id,
1125 'now': datetime.now(tzutil.tzutc()),
1126 'policy': self.policy.data
1127 })
1129 # note for no filters/conditions, this uses all([]) == true property.
1130 state = all([f.process([policy_vars], event) for f in self.filters])
1131 if not state:
1132 self.policy.log.info(
1133 'Skipping policy:%s due to execution conditions', self.policy.name)
1134 return state
1136 def iter_filters(self, block_end=False):
1137 return iter_filters(self.filters, block_end=block_end)
1139 def convert_deprecated(self):
1140 """These deprecated attributes are now recorded as deprecated against the policy."""
1141 filters = []
1142 if 'region' in self.policy.data:
1143 filters.append({'region': self.policy.data['region']})
1144 if 'start' in self.policy.data:
1145 filters.append({
1146 'type': 'value',
1147 'key': 'now',
1148 'value_type': 'date',
1149 'op': 'gte',
1150 'value': self.policy.data['start']})
1151 if 'end' in self.policy.data:
1152 filters.append({
1153 'type': 'value',
1154 'key': 'now',
1155 'value_type': 'date',
1156 'op': 'lte',
1157 'value': self.policy.data['end']})
1158 return filters
1160 def get_deprecations(self):
1161 """Return any matching deprecations for the policy fields itself."""
1162 deprecations = []
1163 for f in self.filters:
1164 deprecations.extend(f.get_deprecations())
1165 return deprecations
1168class Policy:
1170 log = logging.getLogger('custodian.policy')
1172 deprecations = (
1173 deprecated.field('region', 'region in condition block'),
1174 deprecated.field('start', 'value filter in condition block'),
1175 deprecated.field('end', 'value filter in condition block'),
1176 )
1178 def __init__(self, data, options, session_factory=None):
1179 self.data = data
1180 self.options = options
1181 assert "name" in self.data
1182 if session_factory is None:
1183 session_factory = get_session_factory(self.provider_name, options)
1184 self.session_factory = session_factory
1185 self.ctx = ExecutionContext(self.session_factory, self, self.options)
1186 self.resource_manager = self.load_resource_manager()
1187 self.conditions = PolicyConditions(self, data)
1189 def __repr__(self):
1190 return "<Policy resource:%s name:%s region:%s>" % (
1191 self.resource_type, self.name, self.options.region)
1193 @property
1194 def name(self) -> str:
1195 return self.data['name']
1197 @property
1198 def description(self) -> str:
1199 return self.data.get('description', '')
1201 @property
1202 def resource_type(self) -> str:
1203 return self.data['resource']
1205 @property
1206 def provider_name(self) -> str:
1207 return get_policy_provider(self.data)
1209 def is_runnable(self, event=None):
1210 return self.conditions.evaluate(event)
1212 # Runtime circuit breakers
1213 @property
1214 def max_resources(self):
1215 return self.data.get('max-resources')
1217 @property
1218 def max_resources_percent(self):
1219 return self.data.get('max-resources-percent')
1221 @property
1222 def tags(self):
1223 return self.data.get('tags', ())
1225 def get_cache(self):
1226 return self.resource_manager._cache
1228 @property
1229 def execution_mode(self):
1230 return self.data.get('mode', {'type': 'pull'})['type']
1232 def get_execution_mode(self):
1233 try:
1234 exec_mode = execution[self.execution_mode]
1235 except KeyError:
1236 return None
1237 return exec_mode(self)
1239 @property
1240 def is_lambda(self):
1241 if 'mode' not in self.data:
1242 return False
1243 return True
1245 def validate(self):
1246 self.conditions.validate()
1247 m = self.get_execution_mode()
1248 if m is None:
1249 raise PolicyValidationError(
1250 "Invalid Execution mode in policy %s" % (self.data,))
1251 m.validate()
1252 self.validate_policy_start_stop()
1253 self.resource_manager.validate()
1254 for f in self.resource_manager.filters:
1255 f.validate()
1256 for a in self.resource_manager.actions:
1257 a.validate()
1259 def get_variables(self, variables=None):
1260 """Get runtime variables for policy interpolation.
1262 Runtime variables are merged with the passed in variables
1263 if any.
1264 """
1265 # Global policy variable expansion, we have to carry forward on
1266 # various filter/action local vocabularies. Where possible defer
1267 # by using a format string.
1268 #
1269 # See https://github.com/cloud-custodian/cloud-custodian/issues/2330
1270 if not variables:
1271 variables = {}
1273 partition = utils.get_partition(self.options.region)
1274 if 'mode' in self.data:
1275 if 'role' in self.data['mode'] and not self.data['mode']['role'].startswith("arn:aws"):
1276 self.data['mode']['role'] = "arn:%s:iam::%s:role/%s" % \
1277 (partition, self.options.account_id, self.data['mode']['role'])
1279 variables.update({
1280 # standard runtime variables for interpolation
1281 'account': '{account}',
1282 'account_id': self.options.account_id,
1283 'partition': partition,
1284 'region': self.options.region,
1285 # non-standard runtime variables from local filter/action vocabularies
1286 #
1287 # notify action
1288 'policy': self.data,
1289 'event': '{event}',
1290 # mark for op action
1291 'op': '{op}',
1292 'action_date': '{action_date}',
1293 # tag action pyformat-date handling
1294 # defer expansion until runtime for serverless modes
1295 'now': (
1296 utils.DeferredFormatString('now')
1297 if isinstance(self.get_execution_mode(), ServerlessExecutionMode)
1298 else utils.FormatDate(datetime.utcnow())
1299 ),
1300 # account increase limit action
1301 'service': '{service}',
1302 # s3 set logging action :-( see if we can revisit this one.
1303 'bucket_region': '{bucket_region}',
1304 'bucket_name': '{bucket_name}',
1305 'source_bucket_name': '{source_bucket_name}',
1306 'source_bucket_region': '{source_bucket_region}',
1307 'target_bucket_name': '{target_bucket_name}',
1308 'target_prefix': '{target_prefix}',
1309 'LoadBalancerName': '{LoadBalancerName}'
1310 })
1311 return variables
1313 def expand_variables(self, variables):
1314 """Expand variables in policy data.
1316 Updates the policy data in-place.
1317 """
1318 # format string values returns a copy
1319 var_fmt = VarFormat()
1320 updated = utils.format_string_values(
1321 self.data, formatter=var_fmt.format, **variables)
1323 # Several keys should only be expanded at runtime, perserve them.
1324 if 'member-role' in updated.get('mode', {}):
1325 updated['mode']['member-role'] = self.data['mode']['member-role']
1327 # Update ourselves in place
1328 self.data = updated
1330 # NOTE update the policy conditions base on the new self.data
1331 self.conditions.update(self.data)
1333 # Reload filters/actions using updated data, we keep a reference
1334 # for some compatiblity preservation work.
1335 m = self.resource_manager
1336 self.resource_manager = self.load_resource_manager()
1338 # XXX: Compatiblity hack
1339 # Preserve notify action subject lines which support
1340 # embedded jinja2 as a passthrough to the mailer.
1341 for old_a, new_a in zip(m.actions, self.resource_manager.actions):
1342 if old_a.type == 'notify' and 'subject' in old_a.data:
1343 new_a.data['subject'] = old_a.data['subject']
1345 def push(self, event, lambda_ctx=None):
1346 mode = self.get_execution_mode()
1347 return mode.run(event, lambda_ctx)
1349 def provision(self):
1350 """Provision policy as a lambda function."""
1351 mode = self.get_execution_mode()
1352 return mode.provision()
1354 def poll(self):
1355 """Query resources and apply policy."""
1356 mode = self.get_execution_mode()
1357 return mode.run()
1359 def get_permissions(self):
1360 """get permissions needed by this policy"""
1361 permissions = set()
1362 permissions.update(self.resource_manager.get_permissions())
1363 for f in self.resource_manager.filters:
1364 permissions.update(f.get_permissions())
1365 for a in self.resource_manager.actions:
1366 permissions.update(a.get_permissions())
1367 return permissions
1369 def _trim_runtime_filters(self):
1370 from c7n.filters.core import trim_runtime
1371 trim_runtime(self.conditions.filters)
1372 trim_runtime(self.resource_manager.filters)
1374 def __call__(self):
1375 """Run policy in default mode"""
1376 mode = self.get_execution_mode()
1377 if (isinstance(mode, ServerlessExecutionMode) or
1378 self.options.dryrun):
1379 self._trim_runtime_filters()
1381 if self.options.dryrun:
1382 resources = PullMode(self).run()
1383 elif not self.is_runnable():
1384 resources = []
1385 elif isinstance(mode, ServerlessExecutionMode):
1386 resources = mode.provision()
1387 else:
1388 resources = mode.run()
1390 return resources
1392 run = __call__
1394 def _write_file(self, rel_path, value):
1395 """This method is no longer called within c7n, and despite being a private
1396 method, caution is taken here to not break any external callers.
1397 """
1398 log.warning("policy _write_file is deprecated, use ctx.output.write_file")
1399 self.ctx.output.write_file(rel_path, value)
1401 def load_resource_manager(self):
1402 factory = get_resource_class(self.data.get('resource'))
1403 return factory(self.ctx, self.data)
1405 def validate_policy_start_stop(self):
1406 policy_name = self.data.get('name')
1407 policy_tz = self.data.get('tz')
1408 policy_start = self.data.get('start')
1409 policy_end = self.data.get('end')
1411 if policy_tz:
1412 try:
1413 p_tz = tzutil.gettz(policy_tz)
1414 except Exception as e:
1415 raise PolicyValidationError(
1416 "Policy: %s TZ not parsable: %s, %s" % (
1417 policy_name, policy_tz, e))
1419 # Type will be tzwin on windows, but tzwin is null on linux
1420 if not (isinstance(p_tz, tzutil.tzfile) or
1421 (tzutil.tzwin and isinstance(p_tz, tzutil.tzwin))):
1422 raise PolicyValidationError(
1423 "Policy: %s TZ not parsable: %s" % (
1424 policy_name, policy_tz))
1426 for i in [policy_start, policy_end]:
1427 if i:
1428 try:
1429 parser.parse(i)
1430 except Exception as e:
1431 raise ValueError(
1432 "Policy: %s Date/Time not parsable: %s, %s" % (policy_name, i, e))
1434 def get_deprecations(self):
1435 """Return any matching deprecations for the policy fields itself."""
1436 return deprecated.check_deprecations(self, "policy")