Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/tags.py: 28%

567 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4Generic Resource Tag / Filters and Actions wrapper 

5 

6These work for the whole family of resources associated 

7to ec2 (subnets, vpc, security-groups, volumes, instances, 

8snapshots) and resources that support Amazon's Resource Groups Tagging API 

9 

10""" 

11from collections import Counter 

12from concurrent.futures import as_completed 

13 

14from datetime import datetime, timedelta 

15from dateutil import tz as tzutil 

16from dateutil.parser import parse 

17 

18import time 

19 

20from c7n.manager import resources as aws_resources 

21from c7n.actions import BaseAction as Action, AutoTagUser 

22from c7n.exceptions import PolicyValidationError, PolicyExecutionError 

23from c7n.resources import load_resources 

24from c7n.filters import Filter, OPERATORS 

25from c7n.filters.offhours import Time 

26from c7n import deprecated, utils 

27 

28DEFAULT_TAG = "maid_status" 

29 

30 

31def register_ec2_tags(filters, actions): 

32 filters.register('marked-for-op', TagActionFilter) 

33 filters.register('tag-count', TagCountFilter) 

34 

35 actions.register('auto-tag-user', AutoTagUser) 

36 actions.register('mark-for-op', TagDelayedAction) 

37 actions.register('tag-trim', TagTrim) 

38 

39 actions.register('mark', Tag) 

40 actions.register('tag', Tag) 

41 

42 actions.register('unmark', RemoveTag) 

43 actions.register('untag', RemoveTag) 

44 actions.register('remove-tag', RemoveTag) 

45 actions.register('rename-tag', RenameTag) 

46 actions.register('normalize-tag', NormalizeTag) 

47 

48 

49def register_universal_tags(filters, actions, compatibility=True): 

50 filters.register('marked-for-op', TagActionFilter) 

51 

52 if compatibility: 

53 filters.register('tag-count', TagCountFilter) 

54 actions.register('mark', UniversalTag) 

55 

56 actions.register('tag', UniversalTag) 

57 actions.register('auto-tag-user', AutoTagUser) 

58 actions.register('mark-for-op', UniversalTagDelayedAction) 

59 

60 if compatibility: 

61 actions.register('unmark', UniversalUntag) 

62 actions.register('untag', UniversalUntag) 

63 

64 actions.register('remove-tag', UniversalUntag) 

65 actions.register('rename-tag', UniversalTagRename) 

66 

67 

68def universal_augment(self, resources): 

69 # Resource Tagging API Support 

70 # https://docs.aws.amazon.com/awsconsolehelpdocs/latest/gsg/supported-resources.html 

71 # Bail on empty set 

72 if not resources: 

73 return resources 

74 

75 # For global resources, tags don't populate in the get_resources call 

76 # unless the call is being made to us-east-1 

77 region = getattr(self.resource_type, 'global_resource', None) and 'us-east-1' or self.region 

78 

79 client = utils.local_session( 

80 self.session_factory).client('resourcegroupstaggingapi', region_name=region) 

81 

82 # Lazy for non circular :-( 

83 from c7n.query import RetryPageIterator 

84 paginator = client.get_paginator('get_resources') 

85 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

86 

87 rfetch = [r for r in resources if 'Tags' not in r] 

88 

89 for arn_resource_set in utils.chunks( 

90 zip(self.get_arns(rfetch), rfetch), 100): 

91 arn_resource_map = dict(arn_resource_set) 

92 resource_tag_results = client.get_resources( 

93 ResourceARNList=list(arn_resource_map.keys())).get( 

94 'ResourceTagMappingList', ()) 

95 resource_tag_map = { 

96 r['ResourceARN']: r['Tags'] for r in resource_tag_results} 

97 for arn, r in arn_resource_map.items(): 

98 r['Tags'] = resource_tag_map.get(arn, []) 

99 

100 return resources 

101 

102 

103def _common_tag_processer(executor_factory, batch_size, concurrency, client, 

104 process_resource_set, id_key, resources, tags, 

105 log): 

106 

107 error = None 

108 with executor_factory(max_workers=concurrency) as w: 

109 futures = [] 

110 for resource_set in utils.chunks(resources, size=batch_size): 

111 futures.append( 

112 w.submit(process_resource_set, client, resource_set, tags)) 

113 

114 for f in as_completed(futures): 

115 if f.exception(): 

116 error = f.exception() 

117 log.error( 

118 "Exception with tags: %s %s", tags, f.exception()) 

119 

120 if error: 

121 raise error 

122 

123 

124class TagTrim(Action): 

125 """Automatically remove tags from an ec2 resource. 

126 

127 EC2 Resources have a limit of 50 tags, in order to make 

128 additional tags space on a set of resources, this action can 

129 be used to remove enough tags to make the desired amount of 

130 space while preserving a given set of tags. 

131 

132 .. code-block :: yaml 

133 

134 policies: 

135 - name: ec2-tag-trim 

136 comment: | 

137 Any instances with 48 or more tags get tags removed until 

138 they match the target tag count, in this case 47 so we 

139 that we free up a tag slot for another usage. 

140 resource: ec2 

141 filters: 

142 # Filter down to resources which already have 8 tags 

143 # as we need space for 3 more, this also ensures that 

144 # metrics reporting is correct for the policy. 

145 - type: value 

146 key: "length(Tags)" 

147 op: ge 

148 value: 48 

149 actions: 

150 - type: tag-trim 

151 space: 3 

152 preserve: 

153 - OwnerContact 

154 - ASV 

155 - CMDBEnvironment 

156 - downtime 

157 - custodian_status 

158 """ 

159 max_tag_count = 50 

160 

161 schema = utils.type_schema( 

162 'tag-trim', 

163 space={'type': 'integer'}, 

164 preserve={'type': 'array', 'items': {'type': 'string'}}) 

165 schema_alias = True 

166 

167 permissions = ('ec2:DeleteTags',) 

168 

169 def process(self, resources): 

170 self.id_key = self.manager.get_model().id 

171 

172 self.preserve = set(self.data.get('preserve')) 

173 self.space = self.data.get('space', 3) 

174 

175 client = utils.local_session( 

176 self.manager.session_factory).client(self.manager.resource_type.service) 

177 

178 futures = {} 

179 mid = self.manager.get_model().id 

180 

181 with self.executor_factory(max_workers=2) as w: 

182 for r in resources: 

183 futures[w.submit(self.process_resource, client, r)] = r 

184 for f in as_completed(futures): 

185 if f.exception(): 

186 self.log.warning( 

187 "Error processing tag-trim on resource:%s", 

188 futures[f][mid]) 

189 

190 def process_resource(self, client, i): 

191 # Can't really go in batch parallel without some heuristics 

192 # without some more complex matching wrt to grouping resources 

193 # by common tags populations. 

194 tag_map = { 

195 t['Key']: t['Value'] for t in i.get('Tags', []) 

196 if not t['Key'].startswith('aws:')} 

197 

198 # Space == 0 means remove all but specified 

199 if self.space and len(tag_map) + self.space <= self.max_tag_count: 

200 return 

201 

202 keys = set(tag_map) 

203 preserve = self.preserve.intersection(keys) 

204 candidates = keys - self.preserve 

205 

206 if self.space: 

207 # Free up slots to fit 

208 remove = len(candidates) - ( 

209 self.max_tag_count - (self.space + len(preserve))) 

210 candidates = list(sorted(candidates))[:remove] 

211 

212 if not candidates: 

213 self.log.warning( 

214 "Could not find any candidates to trim %s" % i[self.id_key]) 

215 return 

216 

217 self.process_tag_removal(i, candidates) 

218 

219 def process_tag_removal(self, client, resource, tags): 

220 self.manager.retry( 

221 client.delete_tags, 

222 Tags=[{'Key': c} for c in tags], 

223 Resources=[resource[self.id_key]], 

224 DryRun=self.manager.config.dryrun) 

225 

226 

227class TagActionFilter(Filter): 

228 """Filter resources for tag specified future action 

229 

230 Filters resources by a 'maid_status' tag which specifies a future 

231 date for an action. 

232 

233 The filter parses the tag values looking for an 'op@date' 

234 string. The date is parsed and compared to do today's date, the 

235 filter succeeds if today's date is gte to the target date. 

236 

237 The optional 'skew' parameter provides for incrementing today's 

238 date a number of days into the future. An example use case might 

239 be sending a final notice email a few days before terminating an 

240 instance, or snapshotting a volume prior to deletion. 

241 

242 The optional 'skew_hours' parameter provides for incrementing the current 

243 time a number of hours into the future. 

244 

245 Optionally, the 'tz' parameter can get used to specify the timezone 

246 in which to interpret the clock (default value is 'utc') 

247 

248 .. code-block :: yaml 

249 

250 policies: 

251 - name: ec2-stop-marked 

252 resource: ec2 

253 filters: 

254 - type: marked-for-op 

255 # The default tag used is maid_status 

256 # but that is configurable 

257 tag: custodian_status 

258 op: stop 

259 # Another optional tag is skew 

260 tz: utc 

261 actions: 

262 - type: stop 

263 

264 """ 

265 schema = utils.type_schema( 

266 'marked-for-op', 

267 tag={'type': 'string'}, 

268 tz={'type': 'string'}, 

269 skew={'type': 'number', 'minimum': 0}, 

270 skew_hours={'type': 'number', 'minimum': 0}, 

271 op={'type': 'string'}) 

272 schema_alias = True 

273 

274 def validate(self): 

275 op = self.data.get('op') 

276 if self.manager and op not in self.manager.action_registry.keys(): 

277 raise PolicyValidationError( 

278 "Invalid marked-for-op op:%s in %s" % (op, self.manager.data)) 

279 

280 tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) 

281 if not tz: 

282 raise PolicyValidationError( 

283 "Invalid timezone specified '%s' in %s" % ( 

284 self.data.get('tz'), self.manager.data)) 

285 return self 

286 

287 def __call__(self, i): 

288 tag = self.data.get('tag', DEFAULT_TAG) 

289 op = self.data.get('op', 'stop') 

290 skew = self.data.get('skew', 0) 

291 skew_hours = self.data.get('skew_hours', 0) 

292 tz = tzutil.gettz(Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) 

293 

294 v = None 

295 for n in i.get('Tags', ()): 

296 if n['Key'] == tag: 

297 v = n['Value'] 

298 break 

299 

300 if v is None: 

301 return False 

302 if ':' not in v or '@' not in v: 

303 return False 

304 

305 msg, tgt = v.rsplit(':', 1) 

306 action, action_date_str = tgt.strip().split('@', 1) 

307 

308 if action != op: 

309 return False 

310 

311 try: 

312 action_date = parse(action_date_str) 

313 except Exception: 

314 self.log.warning("could not parse tag:%s value:%s on %s" % ( 

315 tag, v, i['InstanceId'])) 

316 return False 

317 

318 if action_date.tzinfo: 

319 # if action_date is timezone aware, set to timezone provided 

320 action_date = action_date.astimezone(tz) 

321 current_date = datetime.now(tz=tz) 

322 else: 

323 current_date = datetime.now() 

324 

325 return current_date >= ( 

326 action_date - timedelta(days=skew, hours=skew_hours)) 

327 

328 

329class TagCountFilter(Filter): 

330 """Simplify tag counting.. 

331 

332 ie. these two blocks are equivalent 

333 

334 .. code-block :: yaml 

335 

336 - filters: 

337 - type: value 

338 op: gte 

339 count: 8 

340 

341 - filters: 

342 - type: tag-count 

343 count: 8 

344 """ 

345 schema = utils.type_schema( 

346 'tag-count', 

347 count={'type': 'integer', 'minimum': 0}, 

348 op={'enum': list(OPERATORS.keys())}) 

349 schema_alias = True 

350 

351 def __call__(self, i): 

352 count = self.data.get('count', 10) 

353 op_name = self.data.get('op', 'gte') 

354 op = OPERATORS.get(op_name) 

355 tag_count = len([ 

356 t['Key'] for t in i.get('Tags', []) 

357 if not t['Key'].startswith('aws:')]) 

358 return op(tag_count, count) 

359 

360 

361class Tag(Action): 

362 """Tag an ec2 resource. 

363 """ 

364 

365 batch_size = 25 

366 concurrency = 2 

367 

368 deprecations = ( 

369 deprecated.alias('mark'), 

370 ) 

371 

372 schema = utils.type_schema( 

373 'tag', aliases=('mark',), 

374 tags={'type': 'object'}, 

375 key={'type': 'string'}, 

376 value={'type': 'string'}, 

377 tag={'type': 'string'}, 

378 ) 

379 schema_alias = True 

380 permissions = ('ec2:CreateTags',) 

381 id_key = None 

382 

383 def validate(self): 

384 if self.data.get('key') and self.data.get('tag'): 

385 raise PolicyValidationError( 

386 "Can't specify both key and tag, choose one in %s" % ( 

387 self.manager.data,)) 

388 return self 

389 

390 def process(self, resources): 

391 # Legacy 

392 msg = self.data.get('msg') 

393 msg = self.data.get('value') or msg 

394 

395 tag = self.data.get('tag', DEFAULT_TAG) 

396 tag = self.data.get('key') or tag 

397 

398 # Support setting multiple tags in a single go with a mapping 

399 tags = self.data.get('tags') 

400 

401 if tags is None: 

402 tags = [] 

403 else: 

404 tags = [{'Key': k, 'Value': v} for k, v in tags.items()] 

405 

406 if msg: 

407 tags.append({'Key': tag, 'Value': msg}) 

408 

409 self.interpolate_values(tags) 

410 

411 batch_size = self.data.get('batch_size', self.batch_size) 

412 

413 client = self.get_client() 

414 _common_tag_processer( 

415 self.executor_factory, batch_size, self.concurrency, client, 

416 self.process_resource_set, self.id_key, resources, tags, self.log) 

417 

418 def process_resource_set(self, client, resource_set, tags): 

419 mid = self.manager.get_model().id 

420 self.manager.retry( 

421 client.create_tags, 

422 Resources=[v[mid] for v in resource_set], 

423 Tags=tags, 

424 DryRun=self.manager.config.dryrun) 

425 

426 def interpolate_single_value(self, tag): 

427 """Interpolate in a single tag value. 

428 """ 

429 params = { 

430 'account_id': self.manager.config.account_id, 

431 'now': utils.FormatDate.utcnow(), 

432 'region': self.manager.config.region} 

433 return str(tag).format(**params) 

434 

435 def interpolate_values(self, tags): 

436 """Interpolate in a list of tags - 'old' ec2 format 

437 """ 

438 for t in tags: 

439 t['Value'] = self.interpolate_single_value(t['Value']) 

440 

441 def get_client(self): 

442 return utils.local_session(self.manager.session_factory).client( 

443 self.manager.resource_type.service) 

444 

445 

446class RemoveTag(Action): 

447 """Remove tags from ec2 resources. 

448 """ 

449 

450 deprecations = ( 

451 deprecated.alias('unmark'), 

452 deprecated.alias('untag'), 

453 ) 

454 

455 batch_size = 100 

456 concurrency = 2 

457 

458 schema = utils.type_schema( 

459 'remove-tag', aliases=('unmark', 'untag', 'remove-tag'), 

460 tags={'type': 'array', 'items': {'type': 'string'}}) 

461 schema_alias = True 

462 permissions = ('ec2:DeleteTags',) 

463 

464 def process(self, resources): 

465 self.id_key = self.manager.get_model().id 

466 

467 tags = self.data.get('tags', [DEFAULT_TAG]) 

468 batch_size = self.data.get('batch_size', self.batch_size) 

469 

470 client = self.get_client() 

471 _common_tag_processer( 

472 self.executor_factory, batch_size, self.concurrency, client, 

473 self.process_resource_set, self.id_key, resources, tags, self.log) 

474 

475 def process_resource_set(self, client, resource_set, tag_keys): 

476 return self.manager.retry( 

477 client.delete_tags, 

478 Resources=[v[self.id_key] for v in resource_set], 

479 Tags=[{'Key': k} for k in tag_keys], 

480 DryRun=self.manager.config.dryrun) 

481 

482 def get_client(self): 

483 return utils.local_session(self.manager.session_factory).client( 

484 self.manager.resource_type.service) 

485 

486 

487class RenameTag(Action): 

488 """ Create a new tag with identical value & remove old tag 

489 """ 

490 

491 schema = utils.type_schema( 

492 'rename-tag', 

493 old_key={'type': 'string'}, 

494 new_key={'type': 'string'}) 

495 schema_alias = True 

496 

497 permissions = ('ec2:CreateTags', 'ec2:DeleteTags') 

498 

499 tag_count_max = 50 

500 

501 def delete_tag(self, client, ids, key, value): 

502 client.delete_tags( 

503 Resources=ids, 

504 Tags=[{'Key': key, 'Value': value}]) 

505 

506 def create_tag(self, client, ids, key, value): 

507 client.create_tags( 

508 Resources=ids, 

509 Tags=[{'Key': key, 'Value': value}]) 

510 

511 def process_rename(self, client, tag_value, resource_set): 

512 """ 

513 Move source tag value to destination tag value 

514 

515 - Collect value from old tag 

516 - Delete old tag 

517 - Create new tag & assign stored value 

518 """ 

519 self.log.info("Renaming tag on %s instances" % (len(resource_set))) 

520 old_key = self.data.get('old_key') 

521 new_key = self.data.get('new_key') 

522 

523 # We have a preference to creating the new tag when possible first 

524 resource_ids = [r[self.id_key] for r in resource_set if len( 

525 r.get('Tags', [])) < self.tag_count_max] 

526 if resource_ids: 

527 self.create_tag(client, resource_ids, new_key, tag_value) 

528 

529 self.delete_tag( 

530 client, [r[self.id_key] for r in resource_set], old_key, tag_value) 

531 

532 # For resources with 50 tags, we need to delete first and then create. 

533 resource_ids = [r[self.id_key] for r in resource_set if len( 

534 r.get('Tags', [])) > self.tag_count_max - 1] 

535 if resource_ids: 

536 self.create_tag(client, resource_ids, new_key, tag_value) 

537 

538 def create_set(self, instances): 

539 old_key = self.data.get('old_key', None) 

540 resource_set = {} 

541 for r in instances: 

542 tags = {t['Key']: t['Value'] for t in r.get('Tags', [])} 

543 if tags[old_key] not in resource_set: 

544 resource_set[tags[old_key]] = [] 

545 resource_set[tags[old_key]].append(r) 

546 return resource_set 

547 

548 def filter_resources(self, resources): 

549 old_key = self.data.get('old_key', None) 

550 filtered_resources = [ 

551 r for r in resources 

552 if old_key in (t['Key'] for t in r.get('Tags', [])) 

553 ] 

554 return filtered_resources 

555 

556 def process(self, resources): 

557 count = len(resources) 

558 resources = self.filter_resources(resources) 

559 self.log.info( 

560 "Filtered from %s resources to %s" % (count, len(resources))) 

561 self.id_key = self.manager.get_model().id 

562 resource_set = self.create_set(resources) 

563 

564 client = self.get_client() 

565 with self.executor_factory(max_workers=3) as w: 

566 futures = [] 

567 for r in resource_set: 

568 futures.append( 

569 w.submit(self.process_rename, client, r, resource_set[r])) 

570 for f in as_completed(futures): 

571 if f.exception(): 

572 self.log.error( 

573 "Exception renaming tag set \n %s" % ( 

574 f.exception())) 

575 return resources 

576 

577 def get_client(self): 

578 return utils.local_session(self.manager.session_factory).client( 

579 self.manager.resource_type.service) 

580 

581 

582class TagDelayedAction(Action): 

583 """Tag resources for future action. 

584 

585 The optional 'tz' parameter can be used to adjust the clock to align 

586 with a given timezone. The default value is 'utc'. 

587 

588 If neither 'days' nor 'hours' is specified, Cloud Custodian will default 

589 to marking the resource for action 4 days in the future. 

590 

591 .. code-block :: yaml 

592 

593 policies: 

594 - name: ec2-mark-for-stop-in-future 

595 resource: ec2 

596 filters: 

597 - type: value 

598 key: Name 

599 value: instance-to-stop-in-four-days 

600 actions: 

601 - type: mark-for-op 

602 op: stop 

603 """ 

604 deprecations = ( 

605 deprecated.optional_fields(('hours', 'days')), 

606 ) 

607 

608 schema = utils.type_schema( 

609 'mark-for-op', 

610 tag={'type': 'string'}, 

611 msg={'type': 'string'}, 

612 days={'type': 'number', 'minimum': 0}, 

613 hours={'type': 'number', 'minimum': 0}, 

614 tz={'type': 'string'}, 

615 op={'type': 'string'}) 

616 schema_alias = True 

617 

618 batch_size = 200 

619 concurrency = 2 

620 

621 default_template = 'Resource does not meet policy: {op}@{action_date}' 

622 

623 def get_permissions(self): 

624 return self.manager.action_registry['tag'].permissions 

625 

626 def validate(self): 

627 op = self.data.get('op') 

628 if self.manager and op not in self.manager.action_registry.keys(): 

629 raise PolicyValidationError( 

630 "mark-for-op specifies invalid op:%s in %s" % ( 

631 op, self.manager.data)) 

632 

633 self.tz = tzutil.gettz( 

634 Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) 

635 if not self.tz: 

636 raise PolicyValidationError( 

637 "Invalid timezone specified %s in %s" % ( 

638 self.tz, self.manager.data)) 

639 return self 

640 

641 def generate_timestamp(self, days, hours): 

642 n = datetime.now(tz=self.tz) 

643 if days is None or hours is None: 

644 # maintains default value of days being 4 if nothing is provided 

645 days = 4 

646 action_date = (n + timedelta(days=days, hours=hours)) 

647 if hours > 0: 

648 action_date_string = action_date.strftime('%Y/%m/%d %H%M %Z') 

649 else: 

650 action_date_string = action_date.strftime('%Y/%m/%d') 

651 

652 return action_date_string 

653 

654 def get_config_values(self): 

655 d = { 

656 'op': self.data.get('op', 'stop'), 

657 'tag': self.data.get('tag', DEFAULT_TAG), 

658 'msg': self.data.get('msg', self.default_template), 

659 'tz': self.data.get('tz', 'utc'), 

660 'days': self.data.get('days', 0), 

661 'hours': self.data.get('hours', 0)} 

662 d['action_date'] = self.generate_timestamp( 

663 d['days'], d['hours']) 

664 return d 

665 

666 def process(self, resources): 

667 cfg = self.get_config_values() 

668 self.tz = tzutil.gettz(Time.TZ_ALIASES.get(cfg['tz'])) 

669 self.id_key = self.manager.get_model().id 

670 

671 msg = cfg['msg'].format( 

672 op=cfg['op'], action_date=cfg['action_date']) 

673 

674 self.log.info("Tagging %d resources for %s on %s" % ( 

675 len(resources), cfg['op'], cfg['action_date'])) 

676 

677 tags = [{'Key': cfg['tag'], 'Value': msg}] 

678 

679 # if the tag implementation has a specified batch size, it's typically 

680 # due to some restraint on the api so we defer to that. 

681 batch_size = getattr( 

682 self.manager.action_registry.get('tag'), 'batch_size', self.batch_size) 

683 

684 client = self.get_client() 

685 _common_tag_processer( 

686 self.executor_factory, batch_size, self.concurrency, client, 

687 self.process_resource_set, self.id_key, resources, tags, self.log) 

688 

689 def process_resource_set(self, client, resource_set, tags): 

690 tagger = self.manager.action_registry['tag']({}, self.manager) 

691 tagger.process_resource_set(client, resource_set, tags) 

692 

693 def get_client(self): 

694 return utils.local_session( 

695 self.manager.session_factory).client( 

696 self.manager.resource_type.service) 

697 

698 

699class NormalizeTag(Action): 

700 """Transform the value of a tag. 

701 

702 Set the tag value to uppercase, title, lowercase, or strip text 

703 from a tag key. 

704 

705 .. code-block :: yaml 

706 

707 policies: 

708 - name: ec2-service-transform-lower 

709 resource: ec2 

710 comment: | 

711 ec2-service-tag-value-to-lower 

712 query: 

713 - instance-state-name: running 

714 filters: 

715 - "tag:testing8882": present 

716 actions: 

717 - type: normalize-tag 

718 key: lower_key 

719 action: lower 

720 

721 - name: ec2-service-strip 

722 resource: ec2 

723 comment: | 

724 ec2-service-tag-strip-blah 

725 query: 

726 - instance-state-name: running 

727 filters: 

728 - "tag:testing8882": present 

729 actions: 

730 - type: normalize-tag 

731 key: strip_key 

732 action: strip 

733 value: blah 

734 

735 """ 

736 

737 schema_alias = True 

738 schema = utils.type_schema( 

739 'normalize-tag', 

740 key={'type': 'string'}, 

741 action={'type': 'string', 

742 'items': { 

743 'enum': ['upper', 'lower', 'title' 'strip', 'replace']}}, 

744 value={'type': 'string'}) 

745 

746 permissions = ('ec2:CreateTags',) 

747 

748 def create_tag(self, client, ids, key, value): 

749 

750 self.manager.retry( 

751 client.create_tags, 

752 Resources=ids, 

753 Tags=[{'Key': key, 'Value': value}]) 

754 

755 def process_transform(self, tag_value, resource_set): 

756 """ 

757 Transform tag value 

758 

759 - Collect value from tag 

760 - Transform Tag value 

761 - Assign new value for key 

762 """ 

763 self.log.info("Transforming tag value on %s instances" % ( 

764 len(resource_set))) 

765 key = self.data.get('key') 

766 

767 c = utils.local_session(self.manager.session_factory).client('ec2') 

768 

769 self.create_tag( 

770 c, 

771 [r[self.id_key] for r in resource_set if len( 

772 r.get('Tags', [])) < 50], 

773 key, tag_value) 

774 

775 def create_set(self, instances): 

776 key = self.data.get('key', None) 

777 resource_set = {} 

778 for r in instances: 

779 tags = {t['Key']: t['Value'] for t in r.get('Tags', [])} 

780 if tags[key] not in resource_set: 

781 resource_set[tags[key]] = [] 

782 resource_set[tags[key]].append(r) 

783 return resource_set 

784 

785 def filter_resources(self, resources): 

786 key = self.data.get('key', None) 

787 filtered_resources = [ 

788 r for r in resources 

789 if key in (t['Key'] for t in r.get('Tags', [])) 

790 ] 

791 return filtered_resources 

792 

793 def process(self, resources): 

794 count = len(resources) 

795 resources = self.filter_resources(resources) 

796 self.log.info( 

797 "Filtered from %s resources to %s" % (count, len(resources))) 

798 self.id_key = self.manager.get_model().id 

799 resource_set = self.create_set(resources) 

800 with self.executor_factory(max_workers=3) as w: 

801 futures = [] 

802 for r in resource_set: 

803 action = self.data.get('action') 

804 value = self.data.get('value') 

805 new_value = False 

806 if action == 'lower' and not r.islower(): 

807 new_value = r.lower() 

808 elif action == 'upper' and not r.isupper(): 

809 new_value = r.upper() 

810 elif action == 'title' and not r.istitle(): 

811 new_value = r.title() 

812 elif action == 'strip' and value and value in r: 

813 new_value = r.strip(value) 

814 if new_value: 

815 futures.append( 

816 w.submit(self.process_transform, new_value, resource_set[r])) 

817 for f in as_completed(futures): 

818 if f.exception(): 

819 self.log.error( 

820 "Exception renaming tag set \n %s" % ( 

821 f.exception())) 

822 return resources 

823 

824 

825class UniversalTag(Tag): 

826 """Applies one or more tags to the specified resources. 

827 

828 :example: 

829 

830 .. code-block :: yaml 

831 

832 policies: 

833 - name: multiple-tags-example 

834 comment: | 

835 Tags any secrets missing either the Environment or ResourceOwner tag 

836 resource: aws.secrets-manager 

837 filters: 

838 - or: 

839 - "tag:Environment": absent 

840 - "tag:ResourceOwner": absent 

841 actions: 

842 - type: tag 

843 tags: 

844 Environment: Staging 

845 ResourceOwner: Avengers 

846 """ 

847 

848 batch_size = 20 

849 concurrency = 1 

850 permissions = ('tag:TagResources',) 

851 

852 def process(self, resources): 

853 self.id_key = self.manager.get_model().id 

854 

855 # Legacy 

856 msg = self.data.get('msg') 

857 msg = self.data.get('value') or msg 

858 

859 tag = self.data.get('tag', DEFAULT_TAG) 

860 tag = self.data.get('key') or tag 

861 

862 # Support setting multiple tags in a single go with a mapping 

863 tags = self.data.get('tags', {}) 

864 

865 if msg: 

866 tags[tag] = msg 

867 

868 self.interpolate_values(tags) 

869 

870 batch_size = self.data.get('batch_size', self.batch_size) 

871 client = self.get_client() 

872 

873 _common_tag_processer( 

874 self.executor_factory, batch_size, self.concurrency, client, 

875 self.process_resource_set, self.id_key, resources, tags, self.log) 

876 

877 def process_resource_set(self, client, resource_set, tags): 

878 arns = self.manager.get_arns(resource_set) 

879 return universal_retry( 

880 client.tag_resources, ResourceARNList=arns, Tags=tags) 

881 

882 def interpolate_values(self, tags): 

883 """Interpolate in a list of tags - 'new' resourcegroupstaggingapi format 

884 """ 

885 for key in list(tags.keys()): 

886 tags[key] = self.interpolate_single_value(tags[key]) 

887 

888 def get_client(self): 

889 # For global resources, manage tags from us-east-1 

890 region = (getattr(self.manager.resource_type, 'global_resource', None) 

891 and 'us-east-1' or self.manager.region) 

892 return utils.local_session(self.manager.session_factory).client( 

893 'resourcegroupstaggingapi', region_name=region) 

894 

895 

896class UniversalUntag(RemoveTag): 

897 """Removes the specified tags from the specified resources. 

898 """ 

899 

900 batch_size = 20 

901 concurrency = 1 

902 permissions = ('tag:UntagResources',) 

903 

904 def get_client(self): 

905 # For global resources, manage tags from us-east-1 

906 region = (getattr(self.manager.resource_type, 'global_resource', None) 

907 and 'us-east-1' or self.manager.region) 

908 return utils.local_session(self.manager.session_factory).client( 

909 'resourcegroupstaggingapi', region_name=region) 

910 

911 def process_resource_set(self, client, resource_set, tag_keys): 

912 arns = self.manager.get_arns(resource_set) 

913 return universal_retry( 

914 client.untag_resources, ResourceARNList=arns, TagKeys=tag_keys) 

915 

916 

917class UniversalTagRename(Action): 

918 """Rename an existing tag key to a new value. 

919 

920 :example: 

921 

922 rename Application, and Bap to App, if a resource has both of the old keys 

923 then we'll use the value specified by Application, which is based on the 

924 order of values of old_keys. 

925 

926 .. code-block :: yaml 

927 

928 policies: 

929 - name: rename-tags-example 

930 resource: aws.log-group 

931 filters: 

932 - or: 

933 - "tag:Bap": present 

934 - "tag:Application": present 

935 actions: 

936 - type: rename-tag 

937 old_keys: [Application, Bap] 

938 new_key: App 

939 """ 

940 schema = utils.type_schema( 

941 'rename-tag', 

942 old_keys={'type': 'array', 'items': {'type': 'string'}}, 

943 old_key={'type': 'string'}, 

944 new_key={'type': 'string'}, 

945 ) 

946 

947 permissions = UniversalTag.permissions + UniversalUntag.permissions 

948 

949 def validate(self): 

950 if 'old_keys' not in self.data and 'old_key' not in self.data: 

951 raise PolicyValidationError( 

952 f"{self.manager.ctx.policy.name}:{self.type} 'old_keys' or 'old_key' required") 

953 

954 def process(self, resources): 

955 old_keys = set(self.data.get('old_keys', ())) 

956 if 'old_key' in self.data: 

957 old_keys.add(self.data['old_key']) 

958 

959 # Collect by distinct tag value, and old_key value to minimize api calls 

960 # via bulk api usage on add and remove. 

961 old_key_resources = {} 

962 values_resources = {} 

963 

964 # sort tags using ordering of old_keys 

965 def key_func(a): 

966 if a['Key'] in old_keys: 

967 return self.data['old_keys'].index(a['Key']) 

968 return 50 

969 

970 for r in resources: 

971 found = False 

972 for t in sorted(r.get('Tags', ()), key=key_func): 

973 if t['Key'] in old_keys: 

974 old_key_resources.setdefault(t['Key'], []).append(r) 

975 if not found: 

976 values_resources.setdefault(t['Value'], []).append(r) 

977 found = True 

978 

979 new_key = self.data['new_key'] 

980 

981 for value, rpopulation in values_resources.items(): 

982 tagger = UniversalTag({'key': new_key, 'value': value}, self.manager) 

983 tagger.process(rpopulation) 

984 for old_key, rpopulation in old_key_resources.items(): 

985 cleaner = UniversalUntag({'tags': [old_key]}, self.manager) 

986 cleaner.process(rpopulation) 

987 

988 

989class UniversalTagDelayedAction(TagDelayedAction): 

990 """Tag resources for future action. 

991 

992 :example: 

993 

994 .. code-block :: yaml 

995 

996 policies: 

997 - name: ec2-mark-stop 

998 resource: ec2 

999 filters: 

1000 - type: image-age 

1001 op: ge 

1002 days: 90 

1003 actions: 

1004 - type: mark-for-op 

1005 tag: custodian_cleanup 

1006 op: terminate 

1007 days: 4 

1008 """ 

1009 

1010 batch_size = 20 

1011 concurrency = 1 

1012 

1013 def process(self, resources): 

1014 self.tz = tzutil.gettz( 

1015 Time.TZ_ALIASES.get(self.data.get('tz', 'utc'))) 

1016 self.id_key = self.manager.get_model().id 

1017 

1018 # Move this to policy? / no resources bypasses actions? 

1019 if not len(resources): 

1020 return 

1021 

1022 msg_tmpl = self.data.get('msg', self.default_template) 

1023 

1024 op = self.data.get('op', 'stop') 

1025 tag = self.data.get('tag', DEFAULT_TAG) 

1026 days = self.data.get('days', 0) 

1027 hours = self.data.get('hours', 0) 

1028 action_date = self.generate_timestamp(days, hours) 

1029 

1030 msg = msg_tmpl.format( 

1031 op=op, action_date=action_date) 

1032 

1033 self.log.info("Tagging %d resources for %s on %s" % ( 

1034 len(resources), op, action_date)) 

1035 

1036 tags = {tag: msg} 

1037 

1038 batch_size = self.data.get('batch_size', self.batch_size) 

1039 client = self.get_client() 

1040 

1041 _common_tag_processer( 

1042 self.executor_factory, batch_size, self.concurrency, client, 

1043 self.process_resource_set, self.id_key, resources, tags, self.log) 

1044 

1045 def process_resource_set(self, client, resource_set, tags): 

1046 arns = self.manager.get_arns(resource_set) 

1047 return universal_retry( 

1048 client.tag_resources, ResourceARNList=arns, Tags=tags) 

1049 

1050 def get_client(self): 

1051 # For global resources, manage tags from us-east-1 

1052 region = (getattr(self.manager.resource_type, 'global_resource', None) 

1053 and 'us-east-1' or self.manager.region) 

1054 return utils.local_session(self.manager.session_factory).client( 

1055 'resourcegroupstaggingapi', region_name=region) 

1056 

1057 

1058class CopyRelatedResourceTag(Tag): 

1059 """ 

1060 Copy a related resource tag to its associated resource 

1061 

1062 In some scenarios, resource tags from a related resource should be applied 

1063 to its child resource. For example, EBS Volume tags propogating to their 

1064 snapshots. To use this action, specify the resource type that contains the 

1065 tags that are to be copied, which can be found by using the 

1066 `custodian schema` command. 

1067 

1068 Then, specify the key on the resource that references the related resource. 

1069 In the case of ebs-snapshot, the VolumeId attribute would be the key that 

1070 identifies the related resource, ebs. 

1071 

1072 Finally, specify a list of tag keys to copy from the related resource onto 

1073 the original resource. The special character "*" can be used to signify that 

1074 all tags from the related resource should be copied to the original resource. 

1075 

1076 To raise an error when related resources cannot be found, use the 

1077 `skip_missing` option. By default, this is set to True. 

1078 

1079 :example: 

1080 

1081 .. code-block:: yaml 

1082 

1083 policies: 

1084 - name: copy-tags-from-ebs-volume-to-snapshot 

1085 resource: ebs-snapshot 

1086 actions: 

1087 - type: copy-related-tag 

1088 resource: ebs 

1089 skip_missing: True 

1090 key: VolumeId 

1091 tags: '*' 

1092 

1093 In the event that the resource type is not supported in Cloud Custodian but 

1094 is supported in the resources groups tagging api, use the resourcegroupstaggingapi 

1095 resource type to reference the resource. The value should be an ARN for the 

1096 related resource. 

1097 

1098 :example: 

1099 

1100 .. code-block:: yaml 

1101 

1102 policies: 

1103 - name: copy-tags-from-unsupported-resource 

1104 resource: ebs-snapshot 

1105 actions: 

1106 - type: copy-related-tag 

1107 resource: resourcegroupstaggingapi 

1108 key: tag:a-resource-tag 

1109 tags: '*' 

1110 

1111 """ 

1112 

1113 schema = utils.type_schema( 

1114 'copy-related-tag', 

1115 resource={'type': 'string'}, 

1116 skip_missing={'type': 'boolean'}, 

1117 key={'type': 'string'}, 

1118 tags={'oneOf': [ 

1119 {'enum': ['*']}, 

1120 {'type': 'array'} 

1121 ]}, 

1122 required=['tags', 'key', 'resource'] 

1123 ) 

1124 schema_alias = True 

1125 

1126 def get_permissions(self): 

1127 return self.manager.action_registry.get('tag').permissions 

1128 

1129 def validate(self): 

1130 related_resource = self.data['resource'] 

1131 if '.' in self.data['resource']: 

1132 related_resource = self.data['resource'].split('.')[-1] 

1133 load_resources((f'aws.{related_resource}', )) 

1134 if ( 

1135 related_resource not in aws_resources.keys() and 

1136 related_resource != "resourcegroupstaggingapi" 

1137 ): 

1138 raise PolicyValidationError( 

1139 "Error: Invalid resource type selected: %s" % related_resource 

1140 ) 

1141 # ideally should never raise here since we shouldn't be applying this 

1142 # action to a resource if it doesn't have a tag action implemented 

1143 if self.manager.action_registry.get('tag') is None: 

1144 raise PolicyValidationError( 

1145 "Error: Tag action missing on resource" 

1146 ) 

1147 return self 

1148 

1149 def process(self, resources): 

1150 related_resources = [] 

1151 if self.data['key'].startswith('tag:'): 

1152 tag_key = self.data['key'].split(':', 1)[-1] 

1153 

1154 search_path = "[].[Tags[?Key=='%s'].Value | [0]]" % tag_key 

1155 else: 

1156 search_path = "[].[%s]" % self.data['key'] 

1157 

1158 for rrid, r in zip(utils.jmespath_search(search_path, resources), 

1159 resources): 

1160 related_resources.append((rrid.pop(), r)) 

1161 

1162 related_ids = {r[0] for r in related_resources} 

1163 missing = False 

1164 if None in related_ids: 

1165 missing = True 

1166 related_ids.discard(None) 

1167 related_tag_map = {} 

1168 if self.data['resource'] == 'resourcegroupstaggingapi': 

1169 related_tag_map = self.get_resource_tag_map_universal(related_ids) 

1170 else: 

1171 related_tag_map = self.get_resource_tag_map(self.data['resource'], related_ids) 

1172 

1173 missing_related_tags = related_ids.difference(related_tag_map.keys()) 

1174 if not self.data.get('skip_missing', True) and (missing_related_tags or missing): 

1175 raise PolicyExecutionError( 

1176 "Unable to find all %d %s related resources tags %d missing" % ( 

1177 len(related_ids), self.data['resource'], 

1178 len(missing_related_tags) + int(missing))) 

1179 

1180 # rely on resource manager tag action implementation as it can differ between resources 

1181 tag_action = self.manager.action_registry.get('tag')({}, self.manager) 

1182 tag_action.id_key = tag_action.manager.get_model().id 

1183 client = tag_action.get_client() 

1184 

1185 stats = Counter() 

1186 

1187 for related, r in related_resources: 

1188 if (related is None or 

1189 related in missing_related_tags or 

1190 not related_tag_map[related]): 

1191 stats['missing'] += 1 

1192 elif self.process_resource( 

1193 client, r, related_tag_map[related], self.data['tags'], tag_action): 

1194 stats['tagged'] += 1 

1195 else: 

1196 stats['unchanged'] += 1 

1197 

1198 self.log.info( 

1199 'Tagged %d resources from related, missing-skipped %d unchanged %d', 

1200 stats['tagged'], stats['missing'], stats['unchanged']) 

1201 

1202 def process_resource(self, client, r, related_tags, tag_keys, tag_action): 

1203 tags = {} 

1204 resource_tags = { 

1205 t['Key']: t['Value'] for t in r.get('Tags', []) if not t['Key'].startswith('aws:')} 

1206 

1207 if tag_keys == '*': 

1208 tags = {k: v for k, v in related_tags.items() 

1209 if resource_tags.get(k) != v and not k.startswith('aws:')} 

1210 else: 

1211 tags = {k: v for k, v in related_tags.items() 

1212 if k in tag_keys and resource_tags.get(k) != v} 

1213 if not tags: 

1214 return 

1215 if not isinstance(tag_action, UniversalTag): 

1216 tags = [{'Key': k, 'Value': v} for k, v in tags.items()] 

1217 tag_action.process_resource_set(client, [r], tags) 

1218 return True 

1219 

1220 def get_resource_tag_map(self, r_type, ids): 

1221 """ 

1222 Returns a mapping of {resource_id: {tagkey: tagvalue}} 

1223 """ 

1224 manager = self.manager.get_resource_manager(r_type) 

1225 r_id = manager.resource_type.id 

1226 

1227 return { 

1228 r[r_id]: {t['Key']: t['Value'] for t in r.get('Tags', [])} 

1229 for r in manager.get_resources(list(ids)) 

1230 } 

1231 

1232 def get_resource_tag_map_universal(self, ids): 

1233 related_region = None 

1234 ids = list(ids) 

1235 if ids: 

1236 if ids[0].startswith('arn:aws:iam'): 

1237 related_region = 'us-east-1' 

1238 client = utils.local_session( 

1239 self.manager.session_factory).client( 

1240 'resourcegroupstaggingapi', region_name=related_region) 

1241 from c7n.query import RetryPageIterator 

1242 paginator = client.get_paginator('get_resources') 

1243 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

1244 

1245 resource_tag_results = client.get_resources( 

1246 ResourceARNList=list(ids))['ResourceTagMappingList'] 

1247 resource_tag_map = { 

1248 r['ResourceARN']: {r['Key']: r['Value'] for r in r['Tags']} 

1249 for r in resource_tag_results} 

1250 return resource_tag_map 

1251 

1252 @classmethod 

1253 def register_resources(klass, registry, resource_class): 

1254 if not resource_class.action_registry.get('tag'): 

1255 return 

1256 resource_class.action_registry.register('copy-related-tag', klass) 

1257 

1258 

1259aws_resources.subscribe(CopyRelatedResourceTag.register_resources) 

1260 

1261 

1262def universal_retry(method, ResourceARNList, **kw): 

1263 """Retry support for resourcegroup tagging apis. 

1264 

1265 The resource group tagging api typically returns a 200 status code 

1266 with embedded resource specific errors. To enable resource specific 

1267 retry on throttles, we extract those, perform backoff w/ jitter and 

1268 continue. Other errors are immediately raised. 

1269 

1270 We do not aggregate unified resource responses across retries, only the 

1271 last successful response is returned for a subset of the resources if 

1272 a retry is performed. 

1273 """ 

1274 max_attempts = 6 

1275 

1276 for idx, delay in enumerate( 

1277 utils.backoff_delays(1.5, 2 ** 8, jitter=True)): 

1278 response = method(ResourceARNList=ResourceARNList, **kw) 

1279 failures = response.get('FailedResourcesMap', {}) 

1280 if not failures: 

1281 return response 

1282 

1283 errors = {} 

1284 throttles = set() 

1285 

1286 for f_arn in failures: 

1287 error_code = failures[f_arn]['ErrorCode'] 

1288 if error_code == 'ThrottlingException': 

1289 throttles.add(f_arn) 

1290 elif error_code == 'ResourceNotFoundException': 

1291 continue 

1292 else: 

1293 errors[f_arn] = error_code 

1294 

1295 if errors: 

1296 raise Exception("Resource Tag Errors %s" % (errors)) 

1297 

1298 if idx == max_attempts - 1: 

1299 raise Exception("Resource Tag Throttled %s" % (", ".join(throttles))) 

1300 

1301 time.sleep(delay) 

1302 ResourceARNList = list(throttles) 

1303 

1304 

1305def coalesce_copy_user_tags(resource, copy_tags, user_tags): 

1306 """ 

1307 Returns a list of tags from resource and user supplied in 

1308 the format: [{'Key': 'key', 'Value': 'value'}] 

1309 

1310 Due to drift on implementation on copy-tags/tags used throughout 

1311 the code base, the following options are supported: 

1312 

1313 copy_tags (Tags to copy from the resource): 

1314 - list of str, e.g. ['key1', 'key2', '*'] 

1315 - bool 

1316 

1317 user_tags (User supplied tags to apply): 

1318 - dict of key-value pairs, e.g. {Key: Value, Key2: Value} 

1319 - list of dict e.g. [{'Key': k, 'Value': v}] 

1320 

1321 In the case that there is a conflict in a user supplied tag 

1322 and an existing tag on the resource, the user supplied tags will 

1323 take priority. 

1324 

1325 Additionally, a value of '*' in copy_tags can be used to signify 

1326 to copy all tags from the resource. 

1327 """ 

1328 

1329 assert isinstance(copy_tags, bool) or isinstance(copy_tags, list) 

1330 assert isinstance(user_tags, dict) or isinstance(user_tags, list) 

1331 

1332 r_tags = resource.get('Tags', []) 

1333 

1334 if isinstance(copy_tags, list): 

1335 if '*' in copy_tags: 

1336 copy_keys = {t['Key'] for t in r_tags if not t['Key'].startswith('aws:')} 

1337 else: 

1338 copy_keys = set(copy_tags) 

1339 

1340 if isinstance(copy_tags, bool): 

1341 if copy_tags is True: 

1342 copy_keys = {t['Key'] for t in r_tags if not t['Key'].startswith('aws:')} 

1343 else: 

1344 copy_keys = set() 

1345 

1346 if isinstance(user_tags, dict): 

1347 user_tags = [{'Key': k, 'Value': v} for k, v in user_tags.items()] 

1348 

1349 user_keys = {t['Key'] for t in user_tags} 

1350 tags_diff = list(copy_keys.difference(user_keys)) 

1351 resource_tags_to_copy = [t for t in r_tags if t['Key'] in tags_diff] 

1352 user_tags.extend(resource_tags_to_copy) 

1353 return user_tags