Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/sar.py: 56%

48 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4 

5from c7n.actions import Action 

6from c7n.filters import CrossAccountAccessFilter 

7from c7n.query import QueryResourceManager, TypeInfo 

8from c7n.manager import resources 

9from c7n.utils import type_schema, local_session 

10 

11 

12@resources.register('serverless-app') 

13class ServerlessApp(QueryResourceManager): 

14 

15 class resource_type(TypeInfo): 

16 service = 'serverlessrepo' 

17 arn = id = 'ApplicationId' 

18 name = 'Name' 

19 enum_spec = ('list_applications', 'Applications', None) 

20 cfn_type = 'AWS::Serverless::Application' 

21 default_report_fields = [ 

22 'ApplicationId', 'Name', 'CreationTime', 'SpdxLicenseId', 'Author'] 

23 

24 

25@ServerlessApp.action_registry.register('delete') 

26class Delete(Action): 

27 

28 permissions = ('serverlessrepo:DeleteApplication',) 

29 schema = type_schema('delete') 

30 

31 def process(self, resources): 

32 client = local_session( 

33 self.manager.session_factory).client('serverlessrepo') 

34 for r in resources: 

35 self.manager.retry( 

36 client.delete_application, 

37 ApplicationId=r['ApplicationId']) 

38 

39 

40@ServerlessApp.filter_registry.register('cross-account') 

41class CrossAccount(CrossAccountAccessFilter): 

42 

43 permissions = ('serverlessrepo:GetApplicationPolicy',) 

44 policy_attribute = 'c7n:Policy' 

45 

46 def process(self, resources, event=None): 

47 client = local_session( 

48 self.manager.session_factory).client('serverlessrepo') 

49 for r in resources: 

50 if self.policy_attribute not in r: 

51 r[self.policy_attribute] = p = client.get_application_policy( 

52 ApplicationId=r['ApplicationId']) 

53 p.pop('ResponseMetadata', None) 

54 self.transform_policy(p) 

55 return super().process(resources) 

56 

57 def transform_policy(self, policy): 

58 """Serverless Application repositories policies aren't valid iam policies. 

59 

60 Its a service specific spelling that violates basic constraints of the iam 

61 schema. We attempt to normalize it to normal IAM spelling. 

62 """ 

63 policy['Statement'] = policy.pop('Statements') 

64 for s in policy['Statement']: 

65 actions = ['serverlessrepo:%s' % a for a in s['Actions']] 

66 s['Actions'] = actions 

67 if 'Effect' not in s: 

68 s['Effect'] = 'Allow' 

69 if 'Principals' in s: 

70 s['Principal'] = {'AWS': s.pop('Principals')} 

71 if 'PrincipalOrgIDs' in s: 

72 org_ids = s.pop('PrincipalOrgIDs') 

73 if org_ids: 

74 s['Condition'] = { 

75 'StringEquals': {'aws:PrincipalOrgID': org_ids}} 

76 return policy