Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/servicecatalog.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

95 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4 

5from c7n.actions import BaseAction 

6from c7n.filters import CrossAccountAccessFilter 

7from c7n.query import ConfigSource, DescribeSource, QueryResourceManager, TypeInfo 

8from c7n.manager import resources 

9from c7n.exceptions import PolicyValidationError 

10from c7n.tags import universal_augment 

11from c7n.utils import local_session, type_schema 

12 

13 

14class DescribePortfolio(DescribeSource): 

15 

16 def augment(self, resources): 

17 return universal_augment(self.manager, super().augment(resources)) 

18 

19 

20@resources.register('catalog-portfolio') 

21class CatalogPortfolio(QueryResourceManager): 

22 

23 class resource_type(TypeInfo): 

24 service = 'servicecatalog' 

25 enum_spec = ('list_portfolios', 'PortfolioDetails', None) 

26 detail_spec = ('describe_portfolio', 'Id', 'Id', None) 

27 arn = 'ARN' 

28 arn_type = 'portfolio' 

29 id = 'Id' 

30 name = 'DisplayName' 

31 date = 'CreatedTime' 

32 universal_taggable = object() 

33 cfn_type = config_type = 'AWS::ServiceCatalog::Portfolio' 

34 permissions_augment = ("servicecatalog:ListTagsForResource",) 

35 

36 source_mapping = { 

37 'describe': DescribePortfolio, 

38 'config': ConfigSource 

39 } 

40 

41 

42@CatalogPortfolio.action_registry.register('delete') 

43class CatalogPortfolioDeleteAction(BaseAction): 

44 """Action to delete a Service Catalog Portfolio 

45 

46 :example: 

47 

48 .. code-block:: yaml 

49 

50 policies: 

51 - name: service-catalog-portfolio-delete 

52 resource: aws.catalog-portfolio 

53 filters: 

54 - type: cross-account 

55 actions: 

56 - delete 

57 """ 

58 

59 schema = type_schema('delete') 

60 permissions = ('servicecatalog:DeletePortfolio',) 

61 

62 def process(self, portfolios): 

63 client = local_session(self.manager.session_factory).client('servicecatalog') 

64 for r in portfolios: 

65 self.manager.retry(client.delete_portfolio, Id=r['Id'], ignore_err_codes=( 

66 'ResourceNotFoundException',)) 

67 

68 

69@CatalogPortfolio.filter_registry.register('cross-account') 

70class CatalogPortfolioCrossAccount(CrossAccountAccessFilter): 

71 """Check for account ids that the service catalog portfolio is shared with 

72 

73 :example: 

74 

75 .. code-block:: yaml 

76 

77 policies: 

78 - name: catalog-portfolio-cross-account 

79 resource: aws.catalog-portfolio 

80 filters: 

81 - type: cross-account 

82 """ 

83 

84 schema = type_schema( 

85 'cross-account', 

86 whitelist_from={'$ref': '#/definitions/filters_common/value_from'}, 

87 whitelist={'type': 'array', 'items': {'type': 'string'}}) 

88 

89 permissions = ('servicecatalog:ListPortfolioAccess',) 

90 annotation_key = 'c7n:CrossAccountViolations' 

91 

92 def check_access(self, client, accounts, resources): 

93 results = [] 

94 for r in resources: 

95 shared_accounts = self.manager.retry( 

96 client.list_portfolio_access, PortfolioId=r['Id'], ignore_err_codes=( 

97 'ResourceNotFoundException',)).get('AccountIds') 

98 if not shared_accounts: 

99 continue 

100 shared_accounts = set(shared_accounts) 

101 delta_accounts = shared_accounts.difference(accounts) 

102 if delta_accounts: 

103 r[self.annotation_key] = list(delta_accounts) 

104 results.append(r) 

105 return results 

106 

107 def process(self, resources, event=None): 

108 results = [] 

109 client = local_session(self.manager.session_factory).client('servicecatalog') 

110 accounts = self.get_accounts() 

111 results.extend(self.check_access(client, accounts, resources)) 

112 return results 

113 

114 

115@CatalogPortfolio.action_registry.register('remove-shared-accounts') 

116class RemoveSharedAccounts(BaseAction): 

117 """Action to delete Portfolio share with other accounts 

118 

119 :example: 

120 

121 .. code-block:: yaml 

122 

123 policies: 

124 - name: catalog-portfolio-delete-share 

125 resource: aws.catalog-portfolio 

126 filters: 

127 - type: cross-account 

128 actions: 

129 - type: remove-shared-accounts 

130 accounts: matched 

131 

132 - name: catalog-portfolio-delete-shared-account 

133 resource: aws.catalog-portfolio 

134 filters: 

135 - type: cross-account 

136 actions: 

137 - type: remove-shared-accounts 

138 accounts: ['123456789123'] 

139 """ 

140 

141 schema = type_schema( 

142 'remove-shared-accounts', 

143 accounts={'oneOf': [ 

144 {'enum': ['matched']}, 

145 {'type': 'array', 'items': {'type': 'string', 'pattern': '^[0-9]{12}$'}}]}, 

146 required=['accounts']) 

147 

148 permissions = ('servicecatalog:DeletePortfolioShare',) 

149 

150 def validate(self): 

151 if self.data['accounts'] != 'matched': 

152 return 

153 found = False 

154 for f in self.manager.iter_filters(): 

155 if isinstance(f, CatalogPortfolioCrossAccount): 

156 found = True 

157 break 

158 if not found: 

159 raise PolicyValidationError( 

160 "policy:%s action:%s with matched requires cross-account filter" % ( 

161 self.manager.ctx.policy.name, self.type)) 

162 

163 def delete_shared_accounts(self, client, portfolio): 

164 accounts = self.data.get('accounts') 

165 if accounts == 'matched': 

166 accounts = portfolio.get(CatalogPortfolioCrossAccount.annotation_key) 

167 for account in accounts: 

168 client.delete_portfolio_share(PortfolioId=portfolio['Id'], AccountId=account) 

169 

170 def process(self, portfolios): 

171 client = local_session(self.manager.session_factory).client('servicecatalog') 

172 for p in portfolios: 

173 self.delete_shared_accounts(client, p) 

174 

175 

176@resources.register('catalog-product') 

177class CatalogProduct(QueryResourceManager): 

178 

179 class resource_type(TypeInfo): 

180 service = 'servicecatalog' 

181 arn_type = 'product' 

182 enum_spec = ('search_products_as_admin', 'ProductViewDetails[].ProductViewSummary', None) 

183 detail_spec = ('describe_product_as_admin', 'Id', 'ProductId', None) 

184 id = 'ProductId' 

185 name = 'Name' 

186 arn = 'ProductARN' 

187 date = 'CreatedTime' 

188 universal_taggable = object() 

189 config_type = cfn_type = 'AWS::ServiceCatalog::CloudFormationProduct' 

190 permissions_augment = ("servicecatalog:ListTagsForResource",)