Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/ctx.py: 18%

78 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import time 

4import uuid 

5import os 

6 

7 

8from c7n.output import ( 

9 api_stats_outputs, 

10 blob_outputs, 

11 log_outputs, 

12 metrics_outputs, 

13 sys_stats_outputs, 

14 tracer_outputs, 

15) 

16 

17from c7n.utils import reset_session_cache, dumps, local_session 

18from c7n.version import version 

19 

20 

21class ExecutionContext: 

22 """Policy Execution Context.""" 

23 

24 def __init__(self, session_factory, policy, options): 

25 self.policy = policy 

26 self.options = options 

27 self.session_factory = session_factory 

28 

29 # Runtime initialized during policy execution 

30 # We treat policies as a fly weight pre-execution. 

31 self.start_time = None 

32 self.execution_id = None 

33 self.output = None 

34 self.logs = None 

35 self.api_stats = None 

36 self.sys_stats = None 

37 

38 # A few tests patch on metrics flush 

39 # For backward compatibility, accept both 'metrics' and 'metrics_enabled' params (PR #4361) 

40 metrics = self.options.metrics or self.options.metrics_enabled 

41 self.metrics = metrics_outputs.select(metrics, self) 

42 

43 # Tracer is wired into core filtering code / which is getting 

44 # invoked sans execution context entry in tests 

45 self.tracer = tracer_outputs.select(self.options.tracer, self) 

46 

47 def initialize(self): 

48 self.output = blob_outputs.select(self.options.output_dir, self) 

49 self.logs = log_outputs.select(self.options.log_group, self) 

50 

51 # Always do file/blob storage outputs 

52 self.output_logs = None 

53 if not isinstance(self.logs, (log_outputs['default'], log_outputs['null'])): 

54 self.output_logs = log_outputs.select(None, self) 

55 

56 # Look for customizations, but fallback to default 

57 for api_stats_type in (self.policy.provider_name, 'default'): 

58 if api_stats_type in api_stats_outputs: 

59 self.api_stats = api_stats_outputs.select(api_stats_type, self) 

60 break 

61 for sys_stats_type in ('psutil', 'default'): 

62 if sys_stats_type in sys_stats_outputs: 

63 self.sys_stats = sys_stats_outputs.select(sys_stats_type, self) 

64 break 

65 

66 self.start_time = time.time() 

67 self.execution_id = str(uuid.uuid4()) 

68 

69 @property 

70 def log_dir(self): 

71 return self.output.root_dir 

72 

73 def __enter__(self): 

74 self.initialize() 

75 self.session_factory.policy_name = self.policy.name 

76 self.sys_stats.__enter__() 

77 self.output.__enter__() 

78 self.logs.__enter__() 

79 if self.output_logs: 

80 self.output_logs.__enter__() 

81 

82 self.api_stats.__enter__() 

83 self.tracer.__enter__() 

84 

85 # Api stats and user agent modification by policy require updating 

86 # in place the cached session thread local. 

87 update_session = getattr(self.session_factory, 'update', None) 

88 if update_session: 

89 update_session(local_session(self.session_factory)) 

90 return self 

91 

92 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

93 if exc_type is not None and self.metrics: 

94 self.metrics.put_metric('PolicyException', 1, "Count") 

95 self.output.write_file('metadata.json', dumps(self.get_metadata(), indent=2)) 

96 self.api_stats.__exit__(exc_type, exc_value, exc_traceback) 

97 

98 with self.tracer.subsegment('output'): 

99 self.metrics.flush() 

100 self.logs.__exit__(exc_type, exc_value, exc_traceback) 

101 if self.output_logs: 

102 self.output_logs.__exit__(exc_type, exc_value, exc_traceback) 

103 self.output.__exit__(exc_type, exc_value, exc_traceback) 

104 

105 self.tracer.__exit__() 

106 

107 self.session_factory.policy_name = None 

108 # IMPORTANT: multi-account execution (c7n-org and others) need 

109 # to manually reset this. Why: Not doing this means we get 

110 # excessive memory usage from client reconstruction for dynamic-gen 

111 # sdks. 

112 if os.environ.get('C7N_TEST_RUN'): 

113 reset_session_cache() 

114 

115 def get_metadata(self, include=('sys-stats', 'api-stats', 'metrics')): 

116 t = time.time() 

117 md = { 

118 'policy': self.policy.data, 

119 'version': version, 

120 'execution': { 

121 'id': self.execution_id, 

122 'start': self.start_time, 

123 'end_time': t, 

124 'duration': t - self.start_time, 

125 }, 

126 'config': dict(self.options), 

127 } 

128 

129 if 'sys-stats' in include and self.sys_stats: 

130 md['sys-stats'] = self.sys_stats.get_metadata() 

131 if 'api-stats' in include and self.api_stats: 

132 md['api-stats'] = self.api_stats.get_metadata() 

133 if 'metrics' in include and self.metrics: 

134 md['metrics'] = self.metrics.get_metadata() 

135 return md