Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/query.py: 34%

516 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4Query capability built on skew metamodel 

5 

6tags_spec -> s3, elb, rds 

7""" 

8from concurrent.futures import as_completed 

9import functools 

10import itertools 

11import json 

12from typing import List 

13 

14import os 

15 

16from c7n.actions import ActionRegistry 

17from c7n.exceptions import ClientError, ResourceLimitExceeded, PolicyExecutionError 

18from c7n.filters import FilterRegistry, MetricsFilter 

19from c7n.manager import ResourceManager 

20from c7n.registry import PluginRegistry 

21from c7n.tags import register_ec2_tags, register_universal_tags, universal_augment 

22from c7n.utils import ( 

23 local_session, generate_arn, get_retry, chunks, camelResource, jmespath_compile) 

24 

25 

26try: 

27 from botocore.paginate import PageIterator, Paginator 

28except ImportError: 

29 # Likely using another provider in a serverless environment 

30 class PageIterator: 

31 pass 

32 

33 class Paginator: 

34 pass 

35 

36 

37class ResourceQuery: 

38 

39 def __init__(self, session_factory): 

40 self.session_factory = session_factory 

41 

42 @staticmethod 

43 def resolve(resource_type): 

44 if not isinstance(resource_type, type): 

45 raise ValueError(resource_type) 

46 else: 

47 m = resource_type 

48 return m 

49 

50 def _invoke_client_enum(self, client, enum_op, params, path, retry=None): 

51 if client.can_paginate(enum_op): 

52 p = client.get_paginator(enum_op) 

53 if retry: 

54 p.PAGE_ITERATOR_CLS = RetryPageIterator 

55 results = p.paginate(**params) 

56 data = results.build_full_result() 

57 else: 

58 op = getattr(client, enum_op) 

59 data = op(**params) 

60 

61 if path: 

62 path = jmespath_compile(path) 

63 data = path.search(data) 

64 

65 return data 

66 

67 def filter(self, resource_manager, **params): 

68 """Query a set of resources.""" 

69 m = self.resolve(resource_manager.resource_type) 

70 if resource_manager.get_client: 

71 client = resource_manager.get_client() 

72 else: 

73 client = local_session(self.session_factory).client( 

74 m.service, resource_manager.config.region) 

75 enum_op, path, extra_args = m.enum_spec 

76 if extra_args: 

77 params.update(extra_args) 

78 return self._invoke_client_enum( 

79 client, enum_op, params, path, 

80 getattr(resource_manager, 'retry', None)) or [] 

81 

82 def get(self, resource_manager, identities): 

83 """Get resources by identities 

84 """ 

85 m = self.resolve(resource_manager.resource_type) 

86 params = {} 

87 client_filter = True 

88 

89 # Try to formulate server side query in the below two scenarios 

90 # else fall back to client side filtering 

91 if m.filter_name: 

92 if m.filter_type == 'list': 

93 params[m.filter_name] = identities 

94 client_filter = False 

95 elif m.filter_type == 'scalar' and len(identities) == 1: 

96 params[m.filter_name] = identities[0] 

97 client_filter = False 

98 

99 resources = self.filter(resource_manager, **params) 

100 if client_filter: 

101 # This logic was added to prevent the issue from: 

102 # https://github.com/cloud-custodian/cloud-custodian/issues/1398 

103 if all(map(lambda r: isinstance(r, str), resources)): 

104 resources = [r for r in resources if r in identities] 

105 # This logic should fix https://github.com/cloud-custodian/cloud-custodian/issues/7573 

106 elif all(map(lambda r: isinstance(r, tuple), resources)): 

107 resources = [(p, r) for p, r in resources if r[m.id] in identities] 

108 else: 

109 resources = [r for r in resources if r[m.id] in identities] 

110 

111 return resources 

112 

113 

114class ChildResourceQuery(ResourceQuery): 

115 """A resource query for resources that must be queried with parent information. 

116 

117 Several resource types can only be queried in the context of their 

118 parents identifiers. ie. efs mount targets (parent efs), route53 resource 

119 records (parent hosted zone), ecs services (ecs cluster). 

120 """ 

121 

122 capture_parent_id = False 

123 parent_key = 'c7n:parent-id' 

124 

125 def __init__(self, session_factory, manager): 

126 self.session_factory = session_factory 

127 self.manager = manager 

128 

129 def filter(self, resource_manager, parent_ids=None, **params): 

130 """Query a set of resources.""" 

131 m = self.resolve(resource_manager.resource_type) 

132 if resource_manager.get_client: 

133 client = resource_manager.get_client() 

134 else: 

135 client = local_session(self.session_factory).client(m.service) 

136 

137 enum_op, path, extra_args = m.enum_spec 

138 if extra_args: 

139 params.update(extra_args) 

140 

141 parent_type, parent_key, annotate_parent = m.parent_spec 

142 parents = self.manager.get_resource_manager(parent_type) 

143 if not parent_ids: 

144 parent_ids = [] 

145 for p in parents.resources(augment=False): 

146 if isinstance(p, str): 

147 parent_ids.append(p) 

148 else: 

149 parent_ids.append(p[parents.resource_type.id]) 

150 

151 # Bail out with no parent ids... 

152 existing_param = parent_key in params 

153 if not existing_param and len(parent_ids) == 0: 

154 return [] 

155 

156 # Handle a query with parent id 

157 if existing_param: 

158 return self._invoke_client_enum(client, enum_op, params, path) 

159 

160 # Have to query separately for each parent's children. 

161 results = [] 

162 for parent_id in parent_ids: 

163 merged_params = self.get_parent_parameters(params, parent_id, parent_key) 

164 subset = self._invoke_client_enum( 

165 client, enum_op, merged_params, path, retry=self.manager.retry) 

166 if annotate_parent: 

167 for r in subset: 

168 r[self.parent_key] = parent_id 

169 if subset and self.capture_parent_id: 

170 results.extend([(parent_id, s) for s in subset]) 

171 elif subset: 

172 results.extend(subset) 

173 return results 

174 

175 def get_parent_parameters(self, params, parent_id, parent_key): 

176 return dict(params, **{parent_key: parent_id}) 

177 

178 

179class QueryMeta(type): 

180 

181 def __new__(cls, name, parents, attrs): 

182 if 'resource_type' not in attrs: 

183 return super(QueryMeta, cls).__new__(cls, name, parents, attrs) 

184 

185 if 'filter_registry' not in attrs: 

186 attrs['filter_registry'] = FilterRegistry( 

187 '%s.filters' % name.lower()) 

188 if 'action_registry' not in attrs: 

189 attrs['action_registry'] = ActionRegistry( 

190 '%s.actions' % name.lower()) 

191 

192 if attrs['resource_type']: 

193 m = ResourceQuery.resolve(attrs['resource_type']) 

194 # Generic cloud watch metrics support 

195 if m.dimension: 

196 attrs['filter_registry'].register('metrics', MetricsFilter) 

197 # EC2 Service boilerplate ... 

198 if m.service == 'ec2': 

199 # Generic ec2 resource tag support 

200 if getattr(m, 'taggable', True): 

201 register_ec2_tags( 

202 attrs['filter_registry'], attrs['action_registry']) 

203 if getattr(m, 'universal_taggable', False): 

204 compatibility = isinstance(m.universal_taggable, bool) and True or False 

205 register_universal_tags( 

206 attrs['filter_registry'], attrs['action_registry'], 

207 compatibility=compatibility) 

208 

209 return super(QueryMeta, cls).__new__(cls, name, parents, attrs) 

210 

211 

212def _napi(op_name): 

213 return op_name.title().replace('_', '') 

214 

215 

216sources = PluginRegistry('sources') 

217 

218 

219@sources.register('describe') 

220class DescribeSource: 

221 

222 resource_query_factory = ResourceQuery 

223 

224 def __init__(self, manager): 

225 self.manager = manager 

226 self.query = self.get_query() 

227 

228 def get_resources(self, ids, cache=True): 

229 return self.query.get(self.manager, ids) 

230 

231 def resources(self, query): 

232 return self.query.filter(self.manager, **query) 

233 

234 def get_query(self): 

235 return self.resource_query_factory(self.manager.session_factory) 

236 

237 def get_query_params(self, query_params): 

238 return query_params 

239 

240 def get_permissions(self): 

241 m = self.manager.get_model() 

242 prefix = m.permission_prefix or m.service 

243 if m.permissions_enum: 

244 perms = list(m.permissions_enum) 

245 else: 

246 perms = ['%s:%s' % (prefix, _napi(m.enum_spec[0]))] 

247 if m.permissions_augment: 

248 perms.extend(m.permissions_augment) 

249 else: 

250 if getattr(m, 'detail_spec', None): 

251 perms.append("%s:%s" % (prefix, _napi(m.detail_spec[0]))) 

252 if getattr(m, 'batch_detail_spec', None): 

253 perms.append("%s:%s" % (prefix, _napi(m.batch_detail_spec[0]))) 

254 return perms 

255 

256 def augment(self, resources): 

257 model = self.manager.get_model() 

258 if getattr(model, 'detail_spec', None): 

259 detail_spec = getattr(model, 'detail_spec', None) 

260 _augment = _scalar_augment 

261 elif getattr(model, 'batch_detail_spec', None): 

262 detail_spec = getattr(model, 'batch_detail_spec', None) 

263 _augment = _batch_augment 

264 else: 

265 return resources 

266 if self.manager.get_client: 

267 client = self.manager.get_client() 

268 else: 

269 client = local_session(self.manager.session_factory).client( 

270 model.service, region_name=self.manager.config.region) 

271 _augment = functools.partial( 

272 _augment, self.manager, model, detail_spec, client) 

273 with self.manager.executor_factory( 

274 max_workers=self.manager.max_workers) as w: 

275 results = list(w.map( 

276 _augment, chunks(resources, self.manager.chunk_size))) 

277 return list(itertools.chain(*results)) 

278 

279 

280class DescribeWithResourceTags(DescribeSource): 

281 

282 def augment(self, resources): 

283 return universal_augment(self.manager, super().augment(resources)) 

284 

285 

286@sources.register('describe-child') 

287class ChildDescribeSource(DescribeSource): 

288 

289 resource_query_factory = ChildResourceQuery 

290 

291 def get_query(self): 

292 return self.resource_query_factory( 

293 self.manager.session_factory, self.manager) 

294 

295 

296@sources.register('config') 

297class ConfigSource: 

298 

299 retry = staticmethod(get_retry(('ThrottlingException',))) 

300 

301 def __init__(self, manager): 

302 self.manager = manager 

303 self.titleCase = self.manager.resource_type.id[0].isupper() 

304 

305 def get_permissions(self): 

306 return ["config:GetResourceConfigHistory", 

307 "config:ListDiscoveredResources"] 

308 

309 def get_resources(self, ids, cache=True): 

310 client = local_session(self.manager.session_factory).client('config') 

311 results = [] 

312 m = self.manager.get_model() 

313 for i in ids: 

314 revisions = self.retry( 

315 client.get_resource_config_history, 

316 resourceId=i, 

317 resourceType=m.config_type, 

318 limit=1).get('configurationItems') 

319 if not revisions: 

320 continue 

321 results.append(self.load_resource(revisions[0])) 

322 return list(filter(None, results)) 

323 

324 def get_query_params(self, query): 

325 """Parse config select expression from policy and parameter. 

326 

327 On policy config supports a full statement being given, or 

328 a clause that will be added to the where expression. 

329 

330 If no query is specified, a default query is utilized. 

331 

332 A valid query should at minimum select fields 

333 for configuration, supplementaryConfiguration and 

334 must have resourceType qualifier. 

335 """ 

336 if query and not isinstance(query, dict): 

337 raise PolicyExecutionError("invalid config source query %s" % (query,)) 

338 

339 if query is None and 'query' in self.manager.data: 

340 _q = [q for q in self.manager.data['query'] if 'expr' in q] 

341 if _q: 

342 query = _q.pop() 

343 

344 if query is None and 'query' in self.manager.data: 

345 _c = [q['clause'] for q in self.manager.data['query'] if 'clause' in q] 

346 if _c: 

347 _c = _c.pop() 

348 elif query: 

349 return query 

350 else: 

351 _c = None 

352 

353 s = ("select resourceId, configuration, supplementaryConfiguration " 

354 "where resourceType = '{}'").format(self.manager.resource_type.config_type) 

355 

356 if _c: 

357 s += "AND {}".format(_c) 

358 

359 return {'expr': s} 

360 

361 def load_resource(self, item): 

362 item_config = self._load_item_config(item) 

363 resource = camelResource( 

364 item_config, implicitDate=True, implicitTitle=self.titleCase) 

365 self._load_resource_tags(resource, item) 

366 return resource 

367 

368 def _load_item_config(self, item): 

369 if isinstance(item['configuration'], str): 

370 item_config = json.loads(item['configuration']) 

371 else: 

372 item_config = item['configuration'] 

373 return item_config 

374 

375 def _load_resource_tags(self, resource, item): 

376 # normalized tag loading across the many variants of config's inconsistencies. 

377 if 'Tags' in resource: 

378 return 

379 elif item.get('tags'): 

380 resource['Tags'] = [ 

381 {u'Key': k, u'Value': v} for k, v in item['tags'].items()] 

382 elif item['supplementaryConfiguration'].get('Tags'): 

383 stags = item['supplementaryConfiguration']['Tags'] 

384 if isinstance(stags, str): 

385 stags = json.loads(stags) 

386 if isinstance(stags, list): 

387 resource['Tags'] = [ 

388 {u'Key': t.get('key', t.get('tagKey')), 

389 u'Value': t.get('value', t.get('tagValue'))} 

390 for t in stags 

391 ] 

392 elif isinstance(stags, dict): 

393 resource['Tags'] = [{u'Key': k, u'Value': v} for k, v in stags.items()] 

394 

395 def get_listed_resources(self, client): 

396 # fallback for when config decides to arbitrarily break select 

397 # resource for a given resource type. 

398 paginator = client.get_paginator('list_discovered_resources') 

399 paginator.PAGE_ITERATOR_CLS = RetryPageIterator 

400 pages = paginator.paginate( 

401 resourceType=self.manager.get_model().config_type) 

402 results = [] 

403 

404 with self.manager.executor_factory(max_workers=2) as w: 

405 ridents = pages.build_full_result() 

406 resource_ids = [ 

407 r['resourceId'] for r in ridents.get('resourceIdentifiers', ())] 

408 self.manager.log.debug( 

409 "querying %d %s resources", 

410 len(resource_ids), 

411 self.manager.__class__.__name__.lower()) 

412 

413 for resource_set in chunks(resource_ids, 50): 

414 futures = [] 

415 futures.append(w.submit(self.get_resources, resource_set)) 

416 for f in as_completed(futures): 

417 if f.exception(): 

418 self.manager.log.error( 

419 "Exception getting resources from config \n %s" % ( 

420 f.exception())) 

421 results.extend(f.result()) 

422 return results 

423 

424 def resources(self, query=None): 

425 client = local_session(self.manager.session_factory).client('config') 

426 query = self.get_query_params(query) 

427 pager = Paginator( 

428 client.select_resource_config, 

429 {'input_token': 'NextToken', 'output_token': 'NextToken', 

430 'result_key': 'Results'}, 

431 client.meta.service_model.operation_model('SelectResourceConfig')) 

432 pager.PAGE_ITERATOR_CLS = RetryPageIterator 

433 

434 results = [] 

435 for page in pager.paginate(Expression=query['expr']): 

436 results.extend([ 

437 self.load_resource(json.loads(r)) for r in page['Results']]) 

438 

439 # Config arbitrarily breaks which resource types its supports for query/select 

440 # on any given day, if we don't have a user defined query, then fallback 

441 # to iteration mode. 

442 if not results and query == self.get_query_params({}): 

443 results = self.get_listed_resources(client) 

444 return results 

445 

446 def augment(self, resources): 

447 return resources 

448 

449 

450class QueryResourceManager(ResourceManager, metaclass=QueryMeta): 

451 

452 resource_type = "" 

453 

454 # TODO Check if we can move to describe source 

455 max_workers = 3 

456 chunk_size = 20 

457 

458 _generate_arn = None 

459 

460 retry = staticmethod( 

461 get_retry(( 

462 'TooManyRequestsException', 

463 'ThrottlingException', 

464 'RequestLimitExceeded', 

465 'Throttled', 

466 'ThrottledException', 

467 'Throttling', 

468 'Client.RequestLimitExceeded'))) 

469 

470 source_mapping = sources 

471 

472 def __init__(self, ctx, data): 

473 super(QueryResourceManager, self).__init__(ctx, data) 

474 self.source = self.get_source(self.source_type) 

475 

476 @property 

477 def source_type(self): 

478 return self.data.get('source', 'describe') 

479 

480 def get_source(self, source_type): 

481 if source_type in self.source_mapping: 

482 return self.source_mapping.get(source_type)(self) 

483 if source_type in sources: 

484 return sources[source_type](self) 

485 raise KeyError("Invalid Source %s" % source_type) 

486 

487 @classmethod 

488 def has_arn(cls): 

489 if cls.resource_type.arn is not None: 

490 return bool(cls.resource_type.arn) 

491 elif getattr(cls.resource_type, 'arn_type', None) is not None: 

492 return True 

493 elif cls.__dict__.get('get_arns'): 

494 return True 

495 return False 

496 

497 @classmethod 

498 def get_model(cls): 

499 return ResourceQuery.resolve(cls.resource_type) 

500 

501 @classmethod 

502 def match_ids(cls, ids): 

503 """return ids that match this resource type's id format.""" 

504 id_prefix = getattr(cls.get_model(), 'id_prefix', None) 

505 if id_prefix is not None: 

506 return [i for i in ids if i.startswith(id_prefix)] 

507 return ids 

508 

509 def get_permissions(self): 

510 perms = self.source.get_permissions() 

511 if getattr(self, 'permissions', None): 

512 perms.extend(self.permissions) 

513 return perms 

514 

515 def get_cache_key(self, query): 

516 return { 

517 'account': self.account_id, 

518 'region': self.config.region, 

519 'resource': str(self.__class__.__name__), 

520 'source': self.source_type, 

521 'q': query 

522 } 

523 

524 def resources(self, query=None, augment=True) -> List[dict]: 

525 query = self.source.get_query_params(query) 

526 cache_key = self.get_cache_key(query) 

527 resources = None 

528 

529 with self._cache: 

530 resources = self._cache.get(cache_key) 

531 if resources is not None: 

532 self.log.debug("Using cached %s: %d" % ( 

533 "%s.%s" % (self.__class__.__module__, self.__class__.__name__), 

534 len(resources))) 

535 

536 if resources is None: 

537 if query is None: 

538 query = {} 

539 with self.ctx.tracer.subsegment('resource-fetch'): 

540 resources = self.source.resources(query) 

541 if augment: 

542 with self.ctx.tracer.subsegment('resource-augment'): 

543 resources = self.augment(resources) 

544 # Don't pollute cache with unaugmented resources. 

545 self._cache.save(cache_key, resources) 

546 

547 resource_count = len(resources) 

548 with self.ctx.tracer.subsegment('filter'): 

549 resources = self.filter_resources(resources) 

550 

551 # Check if we're out of a policies execution limits. 

552 if self.data == self.ctx.policy.data: 

553 self.check_resource_limit(len(resources), resource_count) 

554 return resources 

555 

556 def check_resource_limit(self, selection_count, population_count): 

557 """Check if policy's execution affects more resources then its limit. 

558 

559 Ideally this would be at a higher level but we've hidden 

560 filtering behind the resource manager facade for default usage. 

561 """ 

562 p = self.ctx.policy 

563 max_resource_limits = MaxResourceLimit(p, selection_count, population_count) 

564 return max_resource_limits.check_resource_limits() 

565 

566 def _get_cached_resources(self, ids): 

567 key = self.get_cache_key(None) 

568 with self._cache: 

569 resources = self._cache.get(key) 

570 if resources is not None: 

571 self.log.debug("Using cached results for get_resources") 

572 m = self.get_model() 

573 id_set = set(ids) 

574 return [r for r in resources if r[m.id] in id_set] 

575 return None 

576 

577 def get_resources(self, ids, cache=True, augment=True): 

578 if not ids: 

579 return [] 

580 if cache: 

581 resources = self._get_cached_resources(ids) 

582 if resources is not None: 

583 return resources 

584 try: 

585 resources = self.source.get_resources(ids) 

586 if augment: 

587 resources = self.augment(resources) 

588 return resources 

589 except ClientError as e: 

590 self.log.warning("event ids not resolved: %s error:%s" % (ids, e)) 

591 return [] 

592 

593 def augment(self, resources): 

594 """subclasses may want to augment resources with additional information. 

595 

596 ie. we want tags by default (rds, elb), and policy, location, acl for 

597 s3 buckets. 

598 """ 

599 return self.source.augment(resources) 

600 

601 @property 

602 def account_id(self): 

603 """ Return the current account ID. 

604 

605 This should now be passed in using the --account-id flag, but for a 

606 period of time we will support the old behavior of inferring this from 

607 IAM. 

608 """ 

609 return self.config.account_id 

610 

611 @property 

612 def region(self): 

613 """ Return the current region. 

614 """ 

615 return self.config.region 

616 

617 def get_arns(self, resources): 

618 arns = [] 

619 

620 m = self.get_model() 

621 arn_key = getattr(m, 'arn', None) 

622 if arn_key is False: 

623 raise ValueError("%s do not have arns" % self.type) 

624 

625 id_key = m.id 

626 

627 for r in resources: 

628 _id = r[id_key] 

629 if arn_key: 

630 arns.append(r[arn_key]) 

631 elif 'arn' in _id[:3]: 

632 arns.append(_id) 

633 else: 

634 arns.append(self.generate_arn(_id)) 

635 return arns 

636 

637 @property 

638 def generate_arn(self): 

639 """ Generates generic arn if ID is not already arn format. 

640 """ 

641 if self._generate_arn is None: 

642 self._generate_arn = functools.partial( 

643 generate_arn, 

644 self.resource_type.arn_service or self.resource_type.service, 

645 region=not self.resource_type.global_resource and self.config.region or "", 

646 account_id=self.account_id, 

647 resource_type=self.resource_type.arn_type, 

648 separator=self.resource_type.arn_separator) 

649 return self._generate_arn 

650 

651 

652class MaxResourceLimit: 

653 

654 C7N_MAXRES_OP = os.environ.get("C7N_MAXRES_OP", 'or') 

655 

656 def __init__(self, policy, selection_count, population_count): 

657 self.p = policy 

658 self.op = MaxResourceLimit.C7N_MAXRES_OP 

659 self.selection_count = selection_count 

660 self.population_count = population_count 

661 self.amount = None 

662 self.percentage_amount = None 

663 self.percent = None 

664 self._parse_policy() 

665 

666 def _parse_policy(self,): 

667 if isinstance(self.p.max_resources, dict): 

668 self.op = self.p.max_resources.get("op", MaxResourceLimit.C7N_MAXRES_OP).lower() 

669 self.percent = self.p.max_resources.get("percent") 

670 self.amount = self.p.max_resources.get("amount") 

671 

672 if isinstance(self.p.max_resources, int): 

673 self.amount = self.p.max_resources 

674 

675 if isinstance(self.p.max_resources_percent, (int, float)): 

676 self.percent = self.p.max_resources_percent 

677 

678 if self.percent: 

679 self.percentage_amount = self.population_count * (self.percent / 100.0) 

680 

681 def check_resource_limits(self): 

682 if self.percentage_amount and self.amount: 

683 if (self.selection_count > self.amount and 

684 self.selection_count > self.percentage_amount and self.op == "and"): 

685 raise ResourceLimitExceeded( 

686 ("policy:%s exceeded resource-limit:{limit} and percentage-limit:%s%% " 

687 "found:{selection_count} total:{population_count}") 

688 % (self.p.name, self.percent), "max-resource and max-percent", 

689 self.amount, self.selection_count, self.population_count) 

690 

691 if self.amount: 

692 if self.selection_count > self.amount and self.op != "and": 

693 raise ResourceLimitExceeded( 

694 ("policy:%s exceeded resource-limit:{limit} " 

695 "found:{selection_count} total: {population_count}") % self.p.name, 

696 "max-resource", self.amount, self.selection_count, self.population_count) 

697 

698 if self.percentage_amount: 

699 if self.selection_count > self.percentage_amount and self.op != "and": 

700 raise ResourceLimitExceeded( 

701 ("policy:%s exceeded resource-limit:{limit}%% " 

702 "found:{selection_count} total:{population_count}") % self.p.name, 

703 "max-percent", self.percent, self.selection_count, self.population_count) 

704 

705 

706class ChildResourceManager(QueryResourceManager): 

707 

708 child_source = 'describe-child' 

709 

710 @property 

711 def source_type(self): 

712 source = self.data.get('source', self.child_source) 

713 if source == 'describe': 

714 source = self.child_source 

715 return source 

716 

717 def get_parent_manager(self): 

718 return self.get_resource_manager(self.resource_type.parent_spec[0]) 

719 

720 

721def _batch_augment(manager, model, detail_spec, client, resource_set): 

722 detail_op, param_name, param_key, detail_path, detail_args = detail_spec 

723 op = getattr(client, detail_op) 

724 if manager.retry: 

725 args = (op,) 

726 op = manager.retry 

727 else: 

728 args = () 

729 kw = {param_name: [param_key and r[param_key] or r for r in resource_set]} 

730 if detail_args: 

731 kw.update(detail_args) 

732 response = op(*args, **kw) 

733 return response[detail_path] 

734 

735 

736def _scalar_augment(manager, model, detail_spec, client, resource_set): 

737 detail_op, param_name, param_key, detail_path = detail_spec 

738 op = getattr(client, detail_op) 

739 if manager.retry: 

740 args = (op,) 

741 op = manager.retry 

742 else: 

743 args = () 

744 results = [] 

745 for r in resource_set: 

746 kw = {param_name: param_key and r[param_key] or r} 

747 response = op(*args, **kw) 

748 if detail_path: 

749 response = response[detail_path] 

750 else: 

751 response.pop('ResponseMetadata') 

752 if param_key is None: 

753 response[model.id] = r 

754 r = response 

755 else: 

756 r.update(response) 

757 results.append(r) 

758 return results 

759 

760 

761class RetryPageIterator(PageIterator): 

762 

763 retry = staticmethod(QueryResourceManager.retry) 

764 

765 def _make_request(self, current_kwargs): 

766 return self.retry(self._method, **current_kwargs) 

767 

768 

769class TypeMeta(type): 

770 

771 def __repr__(cls): 

772 if cls.config_type: 

773 identifier = cls.config_type 

774 elif cls.cfn_type: 

775 identifier = cls.cfn_type 

776 elif cls.arn_type and cls.service: 

777 identifier = "AWS::%s::%s" % (cls.service.title(), cls.arn_type.title()) 

778 elif cls.enum_spec and cls.service: 

779 identifier = "AWS::%s::%s" % (cls.service.title(), cls.enum_spec[1]) 

780 elif cls.service: 

781 identifier = "AWS::%s::%s" % (cls.service.title(), cls.id) 

782 else: 

783 identifier = cls.__name__ 

784 return "<TypeInfo %s>" % identifier 

785 

786 

787class TypeInfo(metaclass=TypeMeta): 

788 

789 """ 

790 Resource Type Metadata 

791 

792 

793 **Required** 

794 

795 :param id: Identifier used for apis 

796 :param name: Used for display 

797 :param service: Which aws service (per sdk) has the api for this resource 

798 :param enum_spec: Used to query the resource by describe-sources 

799 

800 **Permissions - Optional** 

801 

802 :param permission_prefix: Permission string prefix if not service 

803 :param permissions_enum: Permissions for resource enumeration/get. 

804 Normally we autogen but in some cases we need to specify statically 

805 :param permissions_augment: Permissions for resource augment 

806 

807 **Arn handling / generation metadata - Optional** 

808 

809 :param arn: Arn resource attribute, when describe format has arn 

810 :param arn_type: Type, used for arn construction, also required for universal tag augment 

811 :param arn_separator: How arn type is separated from rest of arn 

812 :param arn_service: For services that need custom labeling for arns 

813 

814 **Resource retrieval - Optional** 

815 

816 :param filter_name: When fetching a single resource via enum_spec this is technically optional, 

817 but effectively required for serverless event policies else we have to enumerate the 

818 population 

819 :param filter_type: filter_type, scalar or list 

820 :param detail_spec: Used to enrich the resource descriptions returned by enum_spec 

821 :param batch_detail_spec: Used when the api supports getting resource details enmasse 

822 

823 **Misc - Optional** 

824 

825 :param default_report_fields: Used for reporting, array of fields 

826 :param date: Latest date associated to resource, generally references either create date or 

827 modified date 

828 :param dimension: Defines that resource has cloud watch metrics and the resource id can be 

829 passed as this value. Further customizations of dimensions require subclass metrics filter 

830 :param cfn_type: AWS Cloudformation type 

831 :param config_type: AWS Config Service resource type name 

832 :param config_id: Resource attribute that maps to the resourceId field in AWS Config. Intended 

833 for resources which use one ID attribute for service API calls and a different one for 

834 AWS Config (example: IAM resources). 

835 :param universal_taggable: Whether or not resource group tagging api can be used, in which case 

836 we'll automatically register tag actions/filters. Note: values of True will register legacy 

837 tag filters/actions, values of object() will just register current standard 

838 tag/filters/actions. 

839 :param global_resource: Denotes if this resource exists across all regions (iam, cloudfront, 

840 r53) 

841 :param metrics_namespace: Generally we utilize a service to namespace mapping in the metrics 

842 filter. However some resources have a type specific namespace (ig. ebs) 

843 :param id_prefix: Specific to ec2 service resources used to disambiguate a resource by its id 

844 

845 """ 

846 

847 # Required 

848 id = None 

849 name = None 

850 service = None 

851 enum_spec = None 

852 

853 # Permissions 

854 permission_prefix = None 

855 permissions_enum = None 

856 permissions_augment = None 

857 

858 # Arn handling / generation metadata 

859 arn = None 

860 arn_type = None 

861 arn_separator = "/" 

862 arn_service = None 

863 

864 # Resource retrieval 

865 filter_name = None 

866 filter_type = None 

867 detail_spec = None 

868 batch_detail_spec = None 

869 

870 # Misc 

871 default_report_fields = () 

872 date = None 

873 dimension = None 

874 cfn_type = None 

875 config_type = None 

876 config_id = None 

877 universal_taggable = False 

878 global_resource = False 

879 metrics_namespace = None 

880 id_prefix = None