Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/tags.py: 28%
567 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Generic Resource Tag / Filters and Actions wrapper
6These work for the whole family of resources associated
7to ec2 (subnets, vpc, security-groups, volumes, instances,
8snapshots) and resources that support Amazon's Resource Groups Tagging API
10"""
11from collections import Counter
12from concurrent.futures import as_completed
14from datetime import datetime, timedelta
15from dateutil import tz as tzutil
16from dateutil.parser import parse
18import time
20from c7n.manager import resources as aws_resources
21from c7n.actions import BaseAction as Action, AutoTagUser
22from c7n.exceptions import PolicyValidationError, PolicyExecutionError
23from c7n.resources import load_resources
24from c7n.filters import Filter, OPERATORS
25from c7n.filters.offhours import Time
26from c7n import deprecated, utils
28DEFAULT_TAG = "maid_status"
31def register_ec2_tags(filters, actions):
32 filters.register('marked-for-op', TagActionFilter)
33 filters.register('tag-count', TagCountFilter)
35 actions.register('auto-tag-user', AutoTagUser)
36 actions.register('mark-for-op', TagDelayedAction)
37 actions.register('tag-trim', TagTrim)
39 actions.register('mark', Tag)
40 actions.register('tag', Tag)
42 actions.register('unmark', RemoveTag)
43 actions.register('untag', RemoveTag)
44 actions.register('remove-tag', RemoveTag)
45 actions.register('rename-tag', RenameTag)
46 actions.register('normalize-tag', NormalizeTag)
49def register_universal_tags(filters, actions, compatibility=True):
50 filters.register('marked-for-op', TagActionFilter)
52 if compatibility:
53 filters.register('tag-count', TagCountFilter)
54 actions.register('mark', UniversalTag)
56 actions.register('tag', UniversalTag)
57 actions.register('auto-tag-user', AutoTagUser)
58 actions.register('mark-for-op', UniversalTagDelayedAction)
60 if compatibility:
61 actions.register('unmark', UniversalUntag)
62 actions.register('untag', UniversalUntag)
64 actions.register('remove-tag', UniversalUntag)
65 actions.register('rename-tag', UniversalTagRename)
68def universal_augment(self, resources):
69 # Resource Tagging API Support
70 # https://docs.aws.amazon.com/awsconsolehelpdocs/latest/gsg/supported-resources.html
71 # Bail on empty set
72 if not resources:
73 return resources
75 # For global resources, tags don't populate in the get_resources call
76 # unless the call is being made to us-east-1
77 region = getattr(self.resource_type, 'global_resource', None) and 'us-east-1' or self.region
79 client = utils.local_session(
80 self.session_factory).client('resourcegroupstaggingapi', region_name=region)
82 # Lazy for non circular :-(
83 from c7n.query import RetryPageIterator
84 paginator = client.get_paginator('get_resources')
85 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
87 rfetch = [r for r in resources if 'Tags' not in r]
89 for arn_resource_set in utils.chunks(
90 zip(self.get_arns(rfetch), rfetch), 100):
91 arn_resource_map = dict(arn_resource_set)
92 resource_tag_results = client.get_resources(
93 ResourceARNList=list(arn_resource_map.keys())).get(
94 'ResourceTagMappingList', ())
95 resource_tag_map = {
96 r['ResourceARN']: r['Tags'] for r in resource_tag_results}
97 for arn, r in arn_resource_map.items():
98 r['Tags'] = resource_tag_map.get(arn, [])
100 return resources
103def _common_tag_processer(executor_factory, batch_size, concurrency, client,
104 process_resource_set, id_key, resources, tags,
105 log):
107 error = None
108 with executor_factory(max_workers=concurrency) as w:
109 futures = []
110 for resource_set in utils.chunks(resources, size=batch_size):
111 futures.append(
112 w.submit(process_resource_set, client, resource_set, tags))
114 for f in as_completed(futures):
115 if f.exception():
116 error = f.exception()
117 log.error(
118 "Exception with tags: %s %s", tags, f.exception())
120 if error:
121 raise error
124class TagTrim(Action):
125 """Automatically remove tags from an ec2 resource.
127 EC2 Resources have a limit of 50 tags, in order to make
128 additional tags space on a set of resources, this action can
129 be used to remove enough tags to make the desired amount of
130 space while preserving a given set of tags.
132 .. code-block :: yaml
134 policies:
135 - name: ec2-tag-trim
136 comment: |
137 Any instances with 48 or more tags get tags removed until
138 they match the target tag count, in this case 47 so we
139 that we free up a tag slot for another usage.
140 resource: ec2
141 filters:
142 # Filter down to resources which already have 8 tags
143 # as we need space for 3 more, this also ensures that
144 # metrics reporting is correct for the policy.
145 - type: value
146 key: "length(Tags)"
147 op: ge
148 value: 48
149 actions:
150 - type: tag-trim
151 space: 3
152 preserve:
153 - OwnerContact
154 - ASV
155 - CMDBEnvironment
156 - downtime
157 - custodian_status
158 """
159 max_tag_count = 50
161 schema = utils.type_schema(
162 'tag-trim',
163 space={'type': 'integer'},
164 preserve={'type': 'array', 'items': {'type': 'string'}})
165 schema_alias = True
167 permissions = ('ec2:DeleteTags',)
169 def process(self, resources):
170 self.id_key = self.manager.get_model().id
172 self.preserve = set(self.data.get('preserve'))
173 self.space = self.data.get('space', 3)
175 client = utils.local_session(
176 self.manager.session_factory).client(self.manager.resource_type.service)
178 futures = {}
179 mid = self.manager.get_model().id
181 with self.executor_factory(max_workers=2) as w:
182 for r in resources:
183 futures[w.submit(self.process_resource, client, r)] = r
184 for f in as_completed(futures):
185 if f.exception():
186 self.log.warning(
187 "Error processing tag-trim on resource:%s",
188 futures[f][mid])
190 def process_resource(self, client, i):
191 # Can't really go in batch parallel without some heuristics
192 # without some more complex matching wrt to grouping resources
193 # by common tags populations.
194 tag_map = {
195 t['Key']: t['Value'] for t in i.get('Tags', [])
196 if not t['Key'].startswith('aws:')}
198 # Space == 0 means remove all but specified
199 if self.space and len(tag_map) + self.space <= self.max_tag_count:
200 return
202 keys = set(tag_map)
203 preserve = self.preserve.intersection(keys)
204 candidates = keys - self.preserve
206 if self.space:
207 # Free up slots to fit
208 remove = len(candidates) - (
209 self.max_tag_count - (self.space + len(preserve)))
210 candidates = list(sorted(candidates))[:remove]
212 if not candidates:
213 self.log.warning(
214 "Could not find any candidates to trim %s" % i[self.id_key])
215 return
217 self.process_tag_removal(i, candidates)
219 def process_tag_removal(self, client, resource, tags):
220 self.manager.retry(
221 client.delete_tags,
222 Tags=[{'Key': c} for c in tags],
223 Resources=[resource[self.id_key]],
224 DryRun=self.manager.config.dryrun)
227class TagActionFilter(Filter):
228 """Filter resources for tag specified future action
230 Filters resources by a 'maid_status' tag which specifies a future
231 date for an action.
233 The filter parses the tag values looking for an 'op@date'
234 string. The date is parsed and compared to do today's date, the
235 filter succeeds if today's date is gte to the target date.
237 The optional 'skew' parameter provides for incrementing today's
238 date a number of days into the future. An example use case might
239 be sending a final notice email a few days before terminating an
240 instance, or snapshotting a volume prior to deletion.
242 The optional 'skew_hours' parameter provides for incrementing the current
243 time a number of hours into the future.
245 Optionally, the 'tz' parameter can get used to specify the timezone
246 in which to interpret the clock (default value is 'utc')
248 .. code-block :: yaml
250 policies:
251 - name: ec2-stop-marked
252 resource: ec2
253 filters:
254 - type: marked-for-op
255 # The default tag used is maid_status
256 # but that is configurable
257 tag: custodian_status
258 op: stop
259 # Another optional tag is skew
260 tz: utc
261 actions:
262 - type: stop
264 """
265 schema = utils.type_schema(
266 'marked-for-op',
267 tag={'type': 'string'},
268 tz={'type': 'string'},
269 skew={'type': 'number', 'minimum': 0},
270 skew_hours={'type': 'number', 'minimum': 0},
271 op={'type': 'string'})
272 schema_alias = True
274 def validate(self):
275 op = self.data.get('op')
276 if self.manager and op not in self.manager.action_registry.keys():
277 raise PolicyValidationError(
278 "Invalid marked-for-op op:%s in %s" % (op, self.manager.data))
280 tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
281 if not tz:
282 raise PolicyValidationError(
283 "Invalid timezone specified '%s' in %s" % (
284 self.data.get('tz'), self.manager.data))
285 return self
287 def __call__(self, i):
288 tag = self.data.get('tag', DEFAULT_TAG)
289 op = self.data.get('op', 'stop')
290 skew = self.data.get('skew', 0)
291 skew_hours = self.data.get('skew_hours', 0)
292 tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
294 v = None
295 for n in i.get('Tags', ()):
296 if n['Key'] == tag:
297 v = n['Value']
298 break
300 if v is None:
301 return False
302 if ':' not in v or '@' not in v:
303 return False
305 msg, tgt = v.rsplit(':', 1)
306 action, action_date_str = tgt.strip().split('@', 1)
308 if action != op:
309 return False
311 try:
312 action_date = parse(action_date_str)
313 except Exception:
314 self.log.warning("could not parse tag:%s value:%s on %s" % (
315 tag, v, i['InstanceId']))
316 return False
318 if action_date.tzinfo:
319 # if action_date is timezone aware, set to timezone provided
320 action_date = action_date.astimezone(tz)
321 current_date = datetime.now(tz=tz)
322 else:
323 current_date = datetime.now()
325 return current_date >= (
326 action_date - timedelta(days=skew, hours=skew_hours))
329class TagCountFilter(Filter):
330 """Simplify tag counting..
332 ie. these two blocks are equivalent
334 .. code-block :: yaml
336 - filters:
337 - type: value
338 op: gte
339 count: 8
341 - filters:
342 - type: tag-count
343 count: 8
344 """
345 schema = utils.type_schema(
346 'tag-count',
347 count={'type': 'integer', 'minimum': 0},
348 op={'enum': list(OPERATORS.keys())})
349 schema_alias = True
351 def __call__(self, i):
352 count = self.data.get('count', 10)
353 op_name = self.data.get('op', 'gte')
354 op = OPERATORS.get(op_name)
355 tag_count = len([
356 t['Key'] for t in i.get('Tags', [])
357 if not t['Key'].startswith('aws:')])
358 return op(tag_count, count)
361class Tag(Action):
362 """Tag an ec2 resource.
363 """
365 batch_size = 25
366 concurrency = 2
368 deprecations = (
369 deprecated.alias('mark'),
370 )
372 schema = utils.type_schema(
373 'tag', aliases=('mark',),
374 tags={'type': 'object'},
375 key={'type': 'string'},
376 value={'type': 'string'},
377 tag={'type': 'string'},
378 )
379 schema_alias = True
380 permissions = ('ec2:CreateTags',)
381 id_key = None
383 def validate(self):
384 if self.data.get('key') and self.data.get('tag'):
385 raise PolicyValidationError(
386 "Can't specify both key and tag, choose one in %s" % (
387 self.manager.data,))
388 return self
390 def process(self, resources):
391 # Legacy
392 msg = self.data.get('msg')
393 msg = self.data.get('value') or msg
395 tag = self.data.get('tag', DEFAULT_TAG)
396 tag = self.data.get('key') or tag
398 # Support setting multiple tags in a single go with a mapping
399 tags = self.data.get('tags')
401 if tags is None:
402 tags = []
403 else:
404 tags = [{'Key': k, 'Value': v} for k, v in tags.items()]
406 if msg:
407 tags.append({'Key': tag, 'Value': msg})
409 self.interpolate_values(tags)
411 batch_size = self.data.get('batch_size', self.batch_size)
413 client = self.get_client()
414 _common_tag_processer(
415 self.executor_factory, batch_size, self.concurrency, client,
416 self.process_resource_set, self.id_key, resources, tags, self.log)
418 def process_resource_set(self, client, resource_set, tags):
419 mid = self.manager.get_model().id
420 self.manager.retry(
421 client.create_tags,
422 Resources=[v[mid] for v in resource_set],
423 Tags=tags,
424 DryRun=self.manager.config.dryrun)
426 def interpolate_single_value(self, tag):
427 """Interpolate in a single tag value.
428 """
429 params = {
430 'account_id': self.manager.config.account_id,
431 'now': utils.FormatDate.utcnow(),
432 'region': self.manager.config.region}
433 return str(tag).format(**params)
435 def interpolate_values(self, tags):
436 """Interpolate in a list of tags - 'old' ec2 format
437 """
438 for t in tags:
439 t['Value'] = self.interpolate_single_value(t['Value'])
441 def get_client(self):
442 return utils.local_session(self.manager.session_factory).client(
443 self.manager.resource_type.service)
446class RemoveTag(Action):
447 """Remove tags from ec2 resources.
448 """
450 deprecations = (
451 deprecated.alias('unmark'),
452 deprecated.alias('untag'),
453 )
455 batch_size = 100
456 concurrency = 2
458 schema = utils.type_schema(
459 'remove-tag', aliases=('unmark', 'untag', 'remove-tag'),
460 tags={'type': 'array', 'items': {'type': 'string'}})
461 schema_alias = True
462 permissions = ('ec2:DeleteTags',)
464 def process(self, resources):
465 self.id_key = self.manager.get_model().id
467 tags = self.data.get('tags', [DEFAULT_TAG])
468 batch_size = self.data.get('batch_size', self.batch_size)
470 client = self.get_client()
471 _common_tag_processer(
472 self.executor_factory, batch_size, self.concurrency, client,
473 self.process_resource_set, self.id_key, resources, tags, self.log)
475 def process_resource_set(self, client, resource_set, tag_keys):
476 return self.manager.retry(
477 client.delete_tags,
478 Resources=[v[self.id_key] for v in resource_set],
479 Tags=[{'Key': k} for k in tag_keys],
480 DryRun=self.manager.config.dryrun)
482 def get_client(self):
483 return utils.local_session(self.manager.session_factory).client(
484 self.manager.resource_type.service)
487class RenameTag(Action):
488 """ Create a new tag with identical value & remove old tag
489 """
491 schema = utils.type_schema(
492 'rename-tag',
493 old_key={'type': 'string'},
494 new_key={'type': 'string'})
495 schema_alias = True
497 permissions = ('ec2:CreateTags', 'ec2:DeleteTags')
499 tag_count_max = 50
501 def delete_tag(self, client, ids, key, value):
502 client.delete_tags(
503 Resources=ids,
504 Tags=[{'Key': key, 'Value': value}])
506 def create_tag(self, client, ids, key, value):
507 client.create_tags(
508 Resources=ids,
509 Tags=[{'Key': key, 'Value': value}])
511 def process_rename(self, client, tag_value, resource_set):
512 """
513 Move source tag value to destination tag value
515 - Collect value from old tag
516 - Delete old tag
517 - Create new tag & assign stored value
518 """
519 self.log.info("Renaming tag on %s instances" % (len(resource_set)))
520 old_key = self.data.get('old_key')
521 new_key = self.data.get('new_key')
523 # We have a preference to creating the new tag when possible first
524 resource_ids = [r[self.id_key] for r in resource_set if len(
525 r.get('Tags', [])) < self.tag_count_max]
526 if resource_ids:
527 self.create_tag(client, resource_ids, new_key, tag_value)
529 self.delete_tag(
530 client, [r[self.id_key] for r in resource_set], old_key, tag_value)
532 # For resources with 50 tags, we need to delete first and then create.
533 resource_ids = [r[self.id_key] for r in resource_set if len(
534 r.get('Tags', [])) > self.tag_count_max - 1]
535 if resource_ids:
536 self.create_tag(client, resource_ids, new_key, tag_value)
538 def create_set(self, instances):
539 old_key = self.data.get('old_key', None)
540 resource_set = {}
541 for r in instances:
542 tags = {t['Key']: t['Value'] for t in r.get('Tags', [])}
543 if tags[old_key] not in resource_set:
544 resource_set[tags[old_key]] = []
545 resource_set[tags[old_key]].append(r)
546 return resource_set
548 def filter_resources(self, resources):
549 old_key = self.data.get('old_key', None)
550 filtered_resources = [
551 r for r in resources
552 if old_key in (t['Key'] for t in r.get('Tags', []))
553 ]
554 return filtered_resources
556 def process(self, resources):
557 count = len(resources)
558 resources = self.filter_resources(resources)
559 self.log.info(
560 "Filtered from %s resources to %s" % (count, len(resources)))
561 self.id_key = self.manager.get_model().id
562 resource_set = self.create_set(resources)
564 client = self.get_client()
565 with self.executor_factory(max_workers=3) as w:
566 futures = []
567 for r in resource_set:
568 futures.append(
569 w.submit(self.process_rename, client, r, resource_set[r]))
570 for f in as_completed(futures):
571 if f.exception():
572 self.log.error(
573 "Exception renaming tag set \n %s" % (
574 f.exception()))
575 return resources
577 def get_client(self):
578 return utils.local_session(self.manager.session_factory).client(
579 self.manager.resource_type.service)
582class TagDelayedAction(Action):
583 """Tag resources for future action.
585 The optional 'tz' parameter can be used to adjust the clock to align
586 with a given timezone. The default value is 'utc'.
588 If neither 'days' nor 'hours' is specified, Cloud Custodian will default
589 to marking the resource for action 4 days in the future.
591 .. code-block :: yaml
593 policies:
594 - name: ec2-mark-for-stop-in-future
595 resource: ec2
596 filters:
597 - type: value
598 key: Name
599 value: instance-to-stop-in-four-days
600 actions:
601 - type: mark-for-op
602 op: stop
603 """
604 deprecations = (
605 deprecated.optional_fields(('hours', 'days')),
606 )
608 schema = utils.type_schema(
609 'mark-for-op',
610 tag={'type': 'string'},
611 msg={'type': 'string'},
612 days={'type': 'number', 'minimum': 0},
613 hours={'type': 'number', 'minimum': 0},
614 tz={'type': 'string'},
615 op={'type': 'string'})
616 schema_alias = True
618 batch_size = 200
619 concurrency = 2
621 default_template = 'Resource does not meet policy: {op}@{action_date}'
623 def get_permissions(self):
624 return self.manager.action_registry['tag'].permissions
626 def validate(self):
627 op = self.data.get('op')
628 if self.manager and op not in self.manager.action_registry.keys():
629 raise PolicyValidationError(
630 "mark-for-op specifies invalid op:%s in %s" % (
631 op, self.manager.data))
633 self.tz = tzutil.gettz(
634 Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
635 if not self.tz:
636 raise PolicyValidationError(
637 "Invalid timezone specified %s in %s" % (
638 self.tz, self.manager.data))
639 return self
641 def generate_timestamp(self, days, hours):
642 n = datetime.now(tz=self.tz)
643 if days is None or hours is None:
644 # maintains default value of days being 4 if nothing is provided
645 days = 4
646 action_date = (n + timedelta(days=days, hours=hours))
647 if hours > 0:
648 action_date_string = action_date.strftime('%Y/%m/%d %H%M %Z')
649 else:
650 action_date_string = action_date.strftime('%Y/%m/%d')
652 return action_date_string
654 def get_config_values(self):
655 d = {
656 'op': self.data.get('op', 'stop'),
657 'tag': self.data.get('tag', DEFAULT_TAG),
658 'msg': self.data.get('msg', self.default_template),
659 'tz': self.data.get('tz', 'utc'),
660 'days': self.data.get('days', 0),
661 'hours': self.data.get('hours', 0)}
662 d['action_date'] = self.generate_timestamp(
663 d['days'], d['hours'])
664 return d
666 def process(self, resources):
667 cfg = self.get_config_values()
668 self.tz = tzutil.gettz(Time.TZ_ALIASES.get(cfg['tz']))
669 self.id_key = self.manager.get_model().id
671 msg = cfg['msg'].format(
672 op=cfg['op'], action_date=cfg['action_date'])
674 self.log.info("Tagging %d resources for %s on %s" % (
675 len(resources), cfg['op'], cfg['action_date']))
677 tags = [{'Key': cfg['tag'], 'Value': msg}]
679 # if the tag implementation has a specified batch size, it's typically
680 # due to some restraint on the api so we defer to that.
681 batch_size = getattr(
682 self.manager.action_registry.get('tag'), 'batch_size', self.batch_size)
684 client = self.get_client()
685 _common_tag_processer(
686 self.executor_factory, batch_size, self.concurrency, client,
687 self.process_resource_set, self.id_key, resources, tags, self.log)
689 def process_resource_set(self, client, resource_set, tags):
690 tagger = self.manager.action_registry['tag']({}, self.manager)
691 tagger.process_resource_set(client, resource_set, tags)
693 def get_client(self):
694 return utils.local_session(
695 self.manager.session_factory).client(
696 self.manager.resource_type.service)
699class NormalizeTag(Action):
700 """Transform the value of a tag.
702 Set the tag value to uppercase, title, lowercase, or strip text
703 from a tag key.
705 .. code-block :: yaml
707 policies:
708 - name: ec2-service-transform-lower
709 resource: ec2
710 comment: |
711 ec2-service-tag-value-to-lower
712 query:
713 - instance-state-name: running
714 filters:
715 - "tag:testing8882": present
716 actions:
717 - type: normalize-tag
718 key: lower_key
719 action: lower
721 - name: ec2-service-strip
722 resource: ec2
723 comment: |
724 ec2-service-tag-strip-blah
725 query:
726 - instance-state-name: running
727 filters:
728 - "tag:testing8882": present
729 actions:
730 - type: normalize-tag
731 key: strip_key
732 action: strip
733 value: blah
735 """
737 schema_alias = True
738 schema = utils.type_schema(
739 'normalize-tag',
740 key={'type': 'string'},
741 action={'type': 'string',
742 'items': {
743 'enum': ['upper', 'lower', 'title' 'strip', 'replace']}},
744 value={'type': 'string'})
746 permissions = ('ec2:CreateTags',)
748 def create_tag(self, client, ids, key, value):
750 self.manager.retry(
751 client.create_tags,
752 Resources=ids,
753 Tags=[{'Key': key, 'Value': value}])
755 def process_transform(self, tag_value, resource_set):
756 """
757 Transform tag value
759 - Collect value from tag
760 - Transform Tag value
761 - Assign new value for key
762 """
763 self.log.info("Transforming tag value on %s instances" % (
764 len(resource_set)))
765 key = self.data.get('key')
767 c = utils.local_session(self.manager.session_factory).client('ec2')
769 self.create_tag(
770 c,
771 [r[self.id_key] for r in resource_set if len(
772 r.get('Tags', [])) < 50],
773 key, tag_value)
775 def create_set(self, instances):
776 key = self.data.get('key', None)
777 resource_set = {}
778 for r in instances:
779 tags = {t['Key']: t['Value'] for t in r.get('Tags', [])}
780 if tags[key] not in resource_set:
781 resource_set[tags[key]] = []
782 resource_set[tags[key]].append(r)
783 return resource_set
785 def filter_resources(self, resources):
786 key = self.data.get('key', None)
787 filtered_resources = [
788 r for r in resources
789 if key in (t['Key'] for t in r.get('Tags', []))
790 ]
791 return filtered_resources
793 def process(self, resources):
794 count = len(resources)
795 resources = self.filter_resources(resources)
796 self.log.info(
797 "Filtered from %s resources to %s" % (count, len(resources)))
798 self.id_key = self.manager.get_model().id
799 resource_set = self.create_set(resources)
800 with self.executor_factory(max_workers=3) as w:
801 futures = []
802 for r in resource_set:
803 action = self.data.get('action')
804 value = self.data.get('value')
805 new_value = False
806 if action == 'lower' and not r.islower():
807 new_value = r.lower()
808 elif action == 'upper' and not r.isupper():
809 new_value = r.upper()
810 elif action == 'title' and not r.istitle():
811 new_value = r.title()
812 elif action == 'strip' and value and value in r:
813 new_value = r.strip(value)
814 if new_value:
815 futures.append(
816 w.submit(self.process_transform, new_value, resource_set[r]))
817 for f in as_completed(futures):
818 if f.exception():
819 self.log.error(
820 "Exception renaming tag set \n %s" % (
821 f.exception()))
822 return resources
825class UniversalTag(Tag):
826 """Applies one or more tags to the specified resources.
828 :example:
830 .. code-block :: yaml
832 policies:
833 - name: multiple-tags-example
834 comment: |
835 Tags any secrets missing either the Environment or ResourceOwner tag
836 resource: aws.secrets-manager
837 filters:
838 - or:
839 - "tag:Environment": absent
840 - "tag:ResourceOwner": absent
841 actions:
842 - type: tag
843 tags:
844 Environment: Staging
845 ResourceOwner: Avengers
846 """
848 batch_size = 20
849 concurrency = 1
850 permissions = ('tag:TagResources',)
852 def process(self, resources):
853 self.id_key = self.manager.get_model().id
855 # Legacy
856 msg = self.data.get('msg')
857 msg = self.data.get('value') or msg
859 tag = self.data.get('tag', DEFAULT_TAG)
860 tag = self.data.get('key') or tag
862 # Support setting multiple tags in a single go with a mapping
863 tags = self.data.get('tags', {})
865 if msg:
866 tags[tag] = msg
868 self.interpolate_values(tags)
870 batch_size = self.data.get('batch_size', self.batch_size)
871 client = self.get_client()
873 _common_tag_processer(
874 self.executor_factory, batch_size, self.concurrency, client,
875 self.process_resource_set, self.id_key, resources, tags, self.log)
877 def process_resource_set(self, client, resource_set, tags):
878 arns = self.manager.get_arns(resource_set)
879 return universal_retry(
880 client.tag_resources, ResourceARNList=arns, Tags=tags)
882 def interpolate_values(self, tags):
883 """Interpolate in a list of tags - 'new' resourcegroupstaggingapi format
884 """
885 for key in list(tags.keys()):
886 tags[key] = self.interpolate_single_value(tags[key])
888 def get_client(self):
889 # For global resources, manage tags from us-east-1
890 region = (getattr(self.manager.resource_type, 'global_resource', None)
891 and 'us-east-1' or self.manager.region)
892 return utils.local_session(self.manager.session_factory).client(
893 'resourcegroupstaggingapi', region_name=region)
896class UniversalUntag(RemoveTag):
897 """Removes the specified tags from the specified resources.
898 """
900 batch_size = 20
901 concurrency = 1
902 permissions = ('tag:UntagResources',)
904 def get_client(self):
905 # For global resources, manage tags from us-east-1
906 region = (getattr(self.manager.resource_type, 'global_resource', None)
907 and 'us-east-1' or self.manager.region)
908 return utils.local_session(self.manager.session_factory).client(
909 'resourcegroupstaggingapi', region_name=region)
911 def process_resource_set(self, client, resource_set, tag_keys):
912 arns = self.manager.get_arns(resource_set)
913 return universal_retry(
914 client.untag_resources, ResourceARNList=arns, TagKeys=tag_keys)
917class UniversalTagRename(Action):
918 """Rename an existing tag key to a new value.
920 :example:
922 rename Application, and Bap to App, if a resource has both of the old keys
923 then we'll use the value specified by Application, which is based on the
924 order of values of old_keys.
926 .. code-block :: yaml
928 policies:
929 - name: rename-tags-example
930 resource: aws.log-group
931 filters:
932 - or:
933 - "tag:Bap": present
934 - "tag:Application": present
935 actions:
936 - type: rename-tag
937 old_keys: [Application, Bap]
938 new_key: App
939 """
940 schema = utils.type_schema(
941 'rename-tag',
942 old_keys={'type': 'array', 'items': {'type': 'string'}},
943 old_key={'type': 'string'},
944 new_key={'type': 'string'},
945 )
947 permissions = UniversalTag.permissions + UniversalUntag.permissions
949 def validate(self):
950 if 'old_keys' not in self.data and 'old_key' not in self.data:
951 raise PolicyValidationError(
952 f"{self.manager.ctx.policy.name}:{self.type} 'old_keys' or 'old_key' required")
954 def process(self, resources):
955 old_keys = set(self.data.get('old_keys', ()))
956 if 'old_key' in self.data:
957 old_keys.add(self.data['old_key'])
959 # Collect by distinct tag value, and old_key value to minimize api calls
960 # via bulk api usage on add and remove.
961 old_key_resources = {}
962 values_resources = {}
964 # sort tags using ordering of old_keys
965 def key_func(a):
966 if a['Key'] in old_keys:
967 return self.data['old_keys'].index(a['Key'])
968 return 50
970 for r in resources:
971 found = False
972 for t in sorted(r.get('Tags', ()), key=key_func):
973 if t['Key'] in old_keys:
974 old_key_resources.setdefault(t['Key'], []).append(r)
975 if not found:
976 values_resources.setdefault(t['Value'], []).append(r)
977 found = True
979 new_key = self.data['new_key']
981 for value, rpopulation in values_resources.items():
982 tagger = UniversalTag({'key': new_key, 'value': value}, self.manager)
983 tagger.process(rpopulation)
984 for old_key, rpopulation in old_key_resources.items():
985 cleaner = UniversalUntag({'tags': [old_key]}, self.manager)
986 cleaner.process(rpopulation)
989class UniversalTagDelayedAction(TagDelayedAction):
990 """Tag resources for future action.
992 :example:
994 .. code-block :: yaml
996 policies:
997 - name: ec2-mark-stop
998 resource: ec2
999 filters:
1000 - type: image-age
1001 op: ge
1002 days: 90
1003 actions:
1004 - type: mark-for-op
1005 tag: custodian_cleanup
1006 op: terminate
1007 days: 4
1008 """
1010 batch_size = 20
1011 concurrency = 1
1013 def process(self, resources):
1014 self.tz = tzutil.gettz(
1015 Time.TZ_ALIASES.get(self.data.get('tz', 'utc')))
1016 self.id_key = self.manager.get_model().id
1018 # Move this to policy? / no resources bypasses actions?
1019 if not len(resources):
1020 return
1022 msg_tmpl = self.data.get('msg', self.default_template)
1024 op = self.data.get('op', 'stop')
1025 tag = self.data.get('tag', DEFAULT_TAG)
1026 days = self.data.get('days', 0)
1027 hours = self.data.get('hours', 0)
1028 action_date = self.generate_timestamp(days, hours)
1030 msg = msg_tmpl.format(
1031 op=op, action_date=action_date)
1033 self.log.info("Tagging %d resources for %s on %s" % (
1034 len(resources), op, action_date))
1036 tags = {tag: msg}
1038 batch_size = self.data.get('batch_size', self.batch_size)
1039 client = self.get_client()
1041 _common_tag_processer(
1042 self.executor_factory, batch_size, self.concurrency, client,
1043 self.process_resource_set, self.id_key, resources, tags, self.log)
1045 def process_resource_set(self, client, resource_set, tags):
1046 arns = self.manager.get_arns(resource_set)
1047 return universal_retry(
1048 client.tag_resources, ResourceARNList=arns, Tags=tags)
1050 def get_client(self):
1051 # For global resources, manage tags from us-east-1
1052 region = (getattr(self.manager.resource_type, 'global_resource', None)
1053 and 'us-east-1' or self.manager.region)
1054 return utils.local_session(self.manager.session_factory).client(
1055 'resourcegroupstaggingapi', region_name=region)
1058class CopyRelatedResourceTag(Tag):
1059 """
1060 Copy a related resource tag to its associated resource
1062 In some scenarios, resource tags from a related resource should be applied
1063 to its child resource. For example, EBS Volume tags propogating to their
1064 snapshots. To use this action, specify the resource type that contains the
1065 tags that are to be copied, which can be found by using the
1066 `custodian schema` command.
1068 Then, specify the key on the resource that references the related resource.
1069 In the case of ebs-snapshot, the VolumeId attribute would be the key that
1070 identifies the related resource, ebs.
1072 Finally, specify a list of tag keys to copy from the related resource onto
1073 the original resource. The special character "*" can be used to signify that
1074 all tags from the related resource should be copied to the original resource.
1076 To raise an error when related resources cannot be found, use the
1077 `skip_missing` option. By default, this is set to True.
1079 :example:
1081 .. code-block:: yaml
1083 policies:
1084 - name: copy-tags-from-ebs-volume-to-snapshot
1085 resource: ebs-snapshot
1086 actions:
1087 - type: copy-related-tag
1088 resource: ebs
1089 skip_missing: True
1090 key: VolumeId
1091 tags: '*'
1093 In the event that the resource type is not supported in Cloud Custodian but
1094 is supported in the resources groups tagging api, use the resourcegroupstaggingapi
1095 resource type to reference the resource. The value should be an ARN for the
1096 related resource.
1098 :example:
1100 .. code-block:: yaml
1102 policies:
1103 - name: copy-tags-from-unsupported-resource
1104 resource: ebs-snapshot
1105 actions:
1106 - type: copy-related-tag
1107 resource: resourcegroupstaggingapi
1108 key: tag:a-resource-tag
1109 tags: '*'
1111 """
1113 schema = utils.type_schema(
1114 'copy-related-tag',
1115 resource={'type': 'string'},
1116 skip_missing={'type': 'boolean'},
1117 key={'type': 'string'},
1118 tags={'oneOf': [
1119 {'enum': ['*']},
1120 {'type': 'array'}
1121 ]},
1122 required=['tags', 'key', 'resource']
1123 )
1124 schema_alias = True
1126 def get_permissions(self):
1127 return self.manager.action_registry.get('tag').permissions
1129 def validate(self):
1130 related_resource = self.data['resource']
1131 if '.' in self.data['resource']:
1132 related_resource = self.data['resource'].split('.')[-1]
1133 load_resources((f'aws.{related_resource}', ))
1134 if (
1135 related_resource not in aws_resources.keys() and
1136 related_resource != "resourcegroupstaggingapi"
1137 ):
1138 raise PolicyValidationError(
1139 "Error: Invalid resource type selected: %s" % related_resource
1140 )
1141 # ideally should never raise here since we shouldn't be applying this
1142 # action to a resource if it doesn't have a tag action implemented
1143 if self.manager.action_registry.get('tag') is None:
1144 raise PolicyValidationError(
1145 "Error: Tag action missing on resource"
1146 )
1147 return self
1149 def process(self, resources):
1150 related_resources = []
1151 if self.data['key'].startswith('tag:'):
1152 tag_key = self.data['key'].split(':', 1)[-1]
1154 search_path = "[].[Tags[?Key=='%s'].Value | [0]]" % tag_key
1155 else:
1156 search_path = "[].[%s]" % self.data['key']
1158 for rrid, r in zip(utils.jmespath_search(search_path, resources),
1159 resources):
1160 related_resources.append((rrid.pop(), r))
1162 related_ids = {r[0] for r in related_resources}
1163 missing = False
1164 if None in related_ids:
1165 missing = True
1166 related_ids.discard(None)
1167 related_tag_map = {}
1168 if self.data['resource'] == 'resourcegroupstaggingapi':
1169 related_tag_map = self.get_resource_tag_map_universal(related_ids)
1170 else:
1171 related_tag_map = self.get_resource_tag_map(self.data['resource'], related_ids)
1173 missing_related_tags = related_ids.difference(related_tag_map.keys())
1174 if not self.data.get('skip_missing', True) and (missing_related_tags or missing):
1175 raise PolicyExecutionError(
1176 "Unable to find all %d %s related resources tags %d missing" % (
1177 len(related_ids), self.data['resource'],
1178 len(missing_related_tags) + int(missing)))
1180 # rely on resource manager tag action implementation as it can differ between resources
1181 tag_action = self.manager.action_registry.get('tag')({}, self.manager)
1182 tag_action.id_key = tag_action.manager.get_model().id
1183 client = tag_action.get_client()
1185 stats = Counter()
1187 for related, r in related_resources:
1188 if (related is None or
1189 related in missing_related_tags or
1190 not related_tag_map[related]):
1191 stats['missing'] += 1
1192 elif self.process_resource(
1193 client, r, related_tag_map[related], self.data['tags'], tag_action):
1194 stats['tagged'] += 1
1195 else:
1196 stats['unchanged'] += 1
1198 self.log.info(
1199 'Tagged %d resources from related, missing-skipped %d unchanged %d',
1200 stats['tagged'], stats['missing'], stats['unchanged'])
1202 def process_resource(self, client, r, related_tags, tag_keys, tag_action):
1203 tags = {}
1204 resource_tags = {
1205 t['Key']: t['Value'] for t in r.get('Tags', []) if not t['Key'].startswith('aws:')}
1207 if tag_keys == '*':
1208 tags = {k: v for k, v in related_tags.items()
1209 if resource_tags.get(k) != v and not k.startswith('aws:')}
1210 else:
1211 tags = {k: v for k, v in related_tags.items()
1212 if k in tag_keys and resource_tags.get(k) != v}
1213 if not tags:
1214 return
1215 if not isinstance(tag_action, UniversalTag):
1216 tags = [{'Key': k, 'Value': v} for k, v in tags.items()]
1217 tag_action.process_resource_set(client, [r], tags)
1218 return True
1220 def get_resource_tag_map(self, r_type, ids):
1221 """
1222 Returns a mapping of {resource_id: {tagkey: tagvalue}}
1223 """
1224 manager = self.manager.get_resource_manager(r_type)
1225 r_id = manager.resource_type.id
1227 return {
1228 r[r_id]: {t['Key']: t['Value'] for t in r.get('Tags', [])}
1229 for r in manager.get_resources(list(ids))
1230 }
1232 def get_resource_tag_map_universal(self, ids):
1233 related_region = None
1234 ids = list(ids)
1235 if ids:
1236 if ids[0].startswith('arn:aws:iam'):
1237 related_region = 'us-east-1'
1238 client = utils.local_session(
1239 self.manager.session_factory).client(
1240 'resourcegroupstaggingapi', region_name=related_region)
1241 from c7n.query import RetryPageIterator
1242 paginator = client.get_paginator('get_resources')
1243 paginator.PAGE_ITERATOR_CLS = RetryPageIterator
1245 resource_tag_results = client.get_resources(
1246 ResourceARNList=list(ids))['ResourceTagMappingList']
1247 resource_tag_map = {
1248 r['ResourceARN']: {r['Key']: r['Value'] for r in r['Tags']}
1249 for r in resource_tag_results}
1250 return resource_tag_map
1252 @classmethod
1253 def register_resources(klass, registry, resource_class):
1254 if not resource_class.action_registry.get('tag'):
1255 return
1256 resource_class.action_registry.register('copy-related-tag', klass)
1259aws_resources.subscribe(CopyRelatedResourceTag.register_resources)
1262def universal_retry(method, ResourceARNList, **kw):
1263 """Retry support for resourcegroup tagging apis.
1265 The resource group tagging api typically returns a 200 status code
1266 with embedded resource specific errors. To enable resource specific
1267 retry on throttles, we extract those, perform backoff w/ jitter and
1268 continue. Other errors are immediately raised.
1270 We do not aggregate unified resource responses across retries, only the
1271 last successful response is returned for a subset of the resources if
1272 a retry is performed.
1273 """
1274 max_attempts = 6
1276 for idx, delay in enumerate(
1277 utils.backoff_delays(1.5, 2 ** 8, jitter=True)):
1278 response = method(ResourceARNList=ResourceARNList, **kw)
1279 failures = response.get('FailedResourcesMap', {})
1280 if not failures:
1281 return response
1283 errors = {}
1284 throttles = set()
1286 for f_arn in failures:
1287 error_code = failures[f_arn]['ErrorCode']
1288 if error_code == 'ThrottlingException':
1289 throttles.add(f_arn)
1290 elif error_code == 'ResourceNotFoundException':
1291 continue
1292 else:
1293 errors[f_arn] = error_code
1295 if errors:
1296 raise Exception("Resource Tag Errors %s" % (errors))
1298 if idx == max_attempts - 1:
1299 raise Exception("Resource Tag Throttled %s" % (", ".join(throttles)))
1301 time.sleep(delay)
1302 ResourceARNList = list(throttles)
1305def coalesce_copy_user_tags(resource, copy_tags, user_tags):
1306 """
1307 Returns a list of tags from resource and user supplied in
1308 the format: [{'Key': 'key', 'Value': 'value'}]
1310 Due to drift on implementation on copy-tags/tags used throughout
1311 the code base, the following options are supported:
1313 copy_tags (Tags to copy from the resource):
1314 - list of str, e.g. ['key1', 'key2', '*']
1315 - bool
1317 user_tags (User supplied tags to apply):
1318 - dict of key-value pairs, e.g. {Key: Value, Key2: Value}
1319 - list of dict e.g. [{'Key': k, 'Value': v}]
1321 In the case that there is a conflict in a user supplied tag
1322 and an existing tag on the resource, the user supplied tags will
1323 take priority.
1325 Additionally, a value of '*' in copy_tags can be used to signify
1326 to copy all tags from the resource.
1327 """
1329 assert isinstance(copy_tags, bool) or isinstance(copy_tags, list)
1330 assert isinstance(user_tags, dict) or isinstance(user_tags, list)
1332 r_tags = resource.get('Tags', [])
1334 if isinstance(copy_tags, list):
1335 if '*' in copy_tags:
1336 copy_keys = {t['Key'] for t in r_tags if not t['Key'].startswith('aws:')}
1337 else:
1338 copy_keys = set(copy_tags)
1340 if isinstance(copy_tags, bool):
1341 if copy_tags is True:
1342 copy_keys = {t['Key'] for t in r_tags if not t['Key'].startswith('aws:')}
1343 else:
1344 copy_keys = set()
1346 if isinstance(user_tags, dict):
1347 user_tags = [{'Key': k, 'Value': v} for k, v in user_tags.items()]
1349 user_keys = {t['Key'] for t in user_tags}
1350 tags_diff = list(copy_keys.difference(user_keys))
1351 resource_tags_to_copy = [t for t in r_tags if t['Key'] in tags_diff]
1352 user_tags.extend(resource_tags_to_copy)
1353 return user_tags