Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/servicecatalog.py: 60%

92 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4 

5from c7n.actions import BaseAction 

6from c7n.filters import CrossAccountAccessFilter 

7from c7n.query import ConfigSource, DescribeSource, QueryResourceManager, TypeInfo 

8from c7n.manager import resources 

9from c7n.exceptions import PolicyValidationError 

10from c7n.tags import universal_augment 

11from c7n.utils import local_session, type_schema 

12 

13 

14class DescribePortfolio(DescribeSource): 

15 

16 def augment(self, resources): 

17 return universal_augment(self.manager, super().augment(resources)) 

18 

19 

20@resources.register('catalog-portfolio') 

21class CatalogPortfolio(QueryResourceManager): 

22 

23 class resource_type(TypeInfo): 

24 service = 'servicecatalog' 

25 enum_spec = ('list_portfolios', 'PortfolioDetails', None) 

26 detail_spec = ('describe_portfolio', 'Id', 'Id', None) 

27 arn = 'ARN' 

28 arn_type = 'portfolio' 

29 id = 'Id' 

30 name = 'DisplayName' 

31 date = 'CreatedTime' 

32 universal_taggable = object() 

33 cfn_type = config_type = 'AWS::ServiceCatalog::Portfolio' 

34 

35 source_mapping = { 

36 'describe': DescribePortfolio, 

37 'config': ConfigSource 

38 } 

39 

40 

41@CatalogPortfolio.action_registry.register('delete') 

42class CatalogPortfolioDeleteAction(BaseAction): 

43 """Action to delete a Service Catalog Portfolio 

44 

45 :example: 

46 

47 .. code-block:: yaml 

48 

49 policies: 

50 - name: service-catalog-portfolio-delete 

51 resource: aws.catalog-portfolio 

52 filters: 

53 - type: cross-account 

54 actions: 

55 - delete 

56 """ 

57 

58 schema = type_schema('delete') 

59 permissions = ('servicecatalog:DeletePortfolio',) 

60 

61 def process(self, portfolios): 

62 client = local_session(self.manager.session_factory).client('servicecatalog') 

63 for r in portfolios: 

64 self.manager.retry(client.delete_portfolio, Id=r['Id'], ignore_err_codes=( 

65 'ResourceNotFoundException',)) 

66 

67 

68@CatalogPortfolio.filter_registry.register('cross-account') 

69class CatalogPortfolioCrossAccount(CrossAccountAccessFilter): 

70 """Check for account ids that the service catalog portfolio is shared with 

71 

72 :example: 

73 

74 .. code-block:: yaml 

75 

76 policies: 

77 - name: catalog-portfolio-cross-account 

78 resource: aws.catalog-portfolio 

79 filters: 

80 - type: cross-account 

81 """ 

82 

83 schema = type_schema( 

84 'cross-account', 

85 whitelist_from={'$ref': '#/definitions/filters_common/value_from'}, 

86 whitelist={'type': 'array', 'items': {'type': 'string'}}) 

87 

88 permissions = ('servicecatalog:ListPortfolioAccess',) 

89 annotation_key = 'c7n:CrossAccountViolations' 

90 

91 def check_access(self, client, accounts, resources): 

92 results = [] 

93 for r in resources: 

94 shared_accounts = self.manager.retry( 

95 client.list_portfolio_access, PortfolioId=r['Id'], ignore_err_codes=( 

96 'ResourceNotFoundException',)).get('AccountIds') 

97 if not shared_accounts: 

98 continue 

99 shared_accounts = set(shared_accounts) 

100 delta_accounts = shared_accounts.difference(accounts) 

101 if delta_accounts: 

102 r[self.annotation_key] = list(delta_accounts) 

103 results.append(r) 

104 return results 

105 

106 def process(self, resources, event=None): 

107 results = [] 

108 client = local_session(self.manager.session_factory).client('servicecatalog') 

109 accounts = self.get_accounts() 

110 results.extend(self.check_access(client, accounts, resources)) 

111 return results 

112 

113 

114@CatalogPortfolio.action_registry.register('remove-shared-accounts') 

115class RemoveSharedAccounts(BaseAction): 

116 """Action to delete Portfolio share with other accounts 

117 

118 :example: 

119 

120 .. code-block:: yaml 

121 

122 policies: 

123 - name: catalog-portfolio-delete-share 

124 resource: aws.catalog-portfolio 

125 filters: 

126 - type: cross-account 

127 actions: 

128 - type: remove-shared-accounts 

129 accounts: matched 

130 

131 - name: catalog-portfolio-delete-shared-account 

132 resource: aws.catalog-portfolio 

133 filters: 

134 - type: cross-account 

135 actions: 

136 - type: remove-shared-accounts 

137 accounts: ['123456789123'] 

138 """ 

139 

140 schema = type_schema( 

141 'remove-shared-accounts', 

142 accounts={'oneOf': [ 

143 {'enum': ['matched']}, 

144 {'type': 'array', 'items': {'type': 'string', 'pattern': '^[0-9]{12}$'}}]}, 

145 required=['accounts']) 

146 

147 permissions = ('servicecatalog:DeletePortfolioShare',) 

148 

149 def validate(self): 

150 if self.data['accounts'] != 'matched': 

151 return 

152 found = False 

153 for f in self.manager.iter_filters(): 

154 if isinstance(f, CatalogPortfolioCrossAccount): 

155 found = True 

156 break 

157 if not found: 

158 raise PolicyValidationError( 

159 "policy:%s action:%s with matched requires cross-account filter" % ( 

160 self.manager.ctx.policy.name, self.type)) 

161 

162 def delete_shared_accounts(self, client, portfolio): 

163 accounts = self.data.get('accounts') 

164 if accounts == 'matched': 

165 accounts = portfolio.get(CatalogPortfolioCrossAccount.annotation_key) 

166 for account in accounts: 

167 client.delete_portfolio_share(PortfolioId=portfolio['Id'], AccountId=account) 

168 

169 def process(self, portfolios): 

170 client = local_session(self.manager.session_factory).client('servicecatalog') 

171 for p in portfolios: 

172 self.delete_shared_accounts(client, p) 

173 

174 

175@resources.register('catalog-product') 

176class CatalogProduct(QueryResourceManager): 

177 

178 class resource_type(TypeInfo): 

179 service = 'servicecatalog' 

180 arn_type = 'product' 

181 enum_spec = ('search_products_as_admin', 'ProductViewDetails[].ProductViewSummary', None) 

182 detail_spec = ('describe_product_as_admin', 'Id', 'ProductId', None) 

183 id = 'ProductId' 

184 name = 'Name' 

185 arn = 'ProductARN' 

186 date = 'CreatedTime' 

187 universal_taggable = object() 

188 cfn_type = 'AWS::ServiceCatalog::CloudFormationProduct'