Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/actions/core.py: 74%

46 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4Actions to take on resources 

5""" 

6import logging 

7 

8from c7n.element import Element 

9from c7n.exceptions import PolicyValidationError, ClientError 

10from c7n.registry import PluginRegistry 

11 

12 

13class ActionRegistry(PluginRegistry): 

14 

15 def __init__(self, *args, **kw): 

16 super(ActionRegistry, self).__init__(*args, **kw) 

17 # Defer to provider initialization of registry 

18 from .webhook import Webhook 

19 self.register('webhook', Webhook) 

20 

21 def parse(self, data, manager): 

22 results = [] 

23 for d in data: 

24 results.append(self.factory(d, manager)) 

25 return results 

26 

27 def factory(self, data, manager): 

28 if isinstance(data, dict): 

29 action_type = data.get('type') 

30 if action_type is None: 

31 raise PolicyValidationError( 

32 "Invalid action type found in %s" % (data)) 

33 else: 

34 action_type = data 

35 data = {} 

36 

37 action_class = self.get(action_type) 

38 if action_class is None: 

39 raise PolicyValidationError( 

40 "Invalid action type %s, valid actions %s" % ( 

41 action_type, list(self.keys()))) 

42 # Construct a ResourceManager 

43 return action_class(data, manager) 

44 

45 

46class Action(Element): 

47 

48 log = logging.getLogger("custodian.actions") 

49 

50 def __init__(self, data=None, manager=None, log_dir=None): 

51 self.data = data or {} 

52 self.manager = manager 

53 self.log_dir = log_dir 

54 

55 @property 

56 def name(self): 

57 return self.__class__.__name__.lower() 

58 

59 def process(self, resources): 

60 raise NotImplementedError( 

61 "Base action class does not implement behavior") 

62 

63 def _run_api(self, cmd, *args, **kw): 

64 try: 

65 return cmd(*args, **kw) 

66 except ClientError as e: 

67 if (e.response['Error']['Code'] == 'DryRunOperation' and 

68 e.response['ResponseMetadata']['HTTPStatusCode'] == 412 and 

69 'would have succeeded' in e.response['Error']['Message']): 

70 return self.log.info( 

71 "Dry run operation %s succeeded" % ( 

72 self.__class__.__name__.lower())) 

73 raise 

74 

75 

76BaseAction = Action 

77 

78 

79class EventAction(BaseAction): 

80 """Actions which receive lambda event if present 

81 """