Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/output.py: 43%
314 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3"""
4Outputs metrics, logs, stats, traces, and structured records across
5a variety of sinks.
7See docs/usage/outputs.rst
9"""
10import contextlib
11import datetime
12import gzip
13import logging
14import os
15import shutil
16import tempfile
17import time
18import uuid
20from abc import ABC, abstractmethod
22from c7n.exceptions import InvalidOutputConfig
23from c7n.registry import PluginRegistry
24from c7n.utils import parse_url_config, join_output_path
26try:
27 import psutil
29 HAVE_PSUTIL = True
30except ImportError:
31 HAVE_PSUTIL = False
33log = logging.getLogger('custodian.output')
36# TODO remove
37DEFAULT_NAMESPACE = "CloudMaid"
40class OutputRegistry(PluginRegistry):
42 default_protocol = None
44 def select(self, selector, ctx):
45 if not selector:
46 return self['default'](ctx, {'url': selector})
47 if '://' not in selector and selector in self:
48 selector = "{}://".format(selector)
49 elif self.default_protocol and '://' not in selector:
50 selector = "{}://{}".format(
51 self.default_protocol, selector)
52 for k in self.keys():
53 if selector.startswith(k):
54 return self[k](ctx, parse_url_config(selector))
55 raise InvalidOutputConfig("Invalid %s: %s" % (
56 self.plugin_type,
57 selector))
60class BlobOutputRegistry(OutputRegistry):
62 default_protocol = "file"
65class LogOutputRegistry(OutputRegistry):
67 default_protocol = "aws"
70class MetricsRegistry(OutputRegistry):
72 def select(self, selector, ctx):
73 # Compatibility for boolean configuration
74 if isinstance(selector, bool) and selector:
75 selector = 'aws'
76 return super(MetricsRegistry, self).select(selector, ctx)
79api_stats_outputs = OutputRegistry('c7n.output.api_stats')
80blob_outputs = BlobOutputRegistry('c7n.output.blob')
81log_outputs = LogOutputRegistry('c7n.output.logs')
82metrics_outputs = MetricsRegistry('c7n.output.metrics')
83tracer_outputs = OutputRegistry('c7n.output.tracer')
84sys_stats_outputs = OutputRegistry('c7n.output.sys_stats')
87@tracer_outputs.register('default')
88class NullTracer:
89 """Tracing provides for detailed analytics of a policy execution.
91 Uses native cloud provider integration (xray, stack driver trace).
92 """
93 def __init__(self, ctx, config=None):
94 self.ctx = ctx
95 self.config = config or {}
97 @contextlib.contextmanager
98 def subsegment(self, name):
99 """Create a named subsegment as a context manager
100 """
101 yield self
103 def __enter__(self):
104 """Enter main segment for policy execution.
105 """
107 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
108 """Exit main segment for policy execution.
109 """
112class DeltaStats:
113 """Capture stats (dictionary of string->integer) as a stack.
115 Popping the stack automatically creates a delta of the last
116 stack element to the current stats.
117 """
118 def __init__(self, ctx, config=None):
119 self.ctx = ctx
120 self.config = config or {}
121 self.snapshot_stack = []
123 def push_snapshot(self):
124 self.snapshot_stack.append(self.get_snapshot())
126 def pop_snapshot(self):
127 return self.delta(
128 self.snapshot_stack.pop(), self.get_snapshot())
130 def get_snapshot(self):
131 return {}
133 def delta(self, before, after):
134 delta = {}
135 for k in before:
136 val = after[k] - before[k]
137 if val:
138 delta[k] = val
139 return delta
142@sys_stats_outputs.register('default')
143@api_stats_outputs.register('default')
144class NullStats:
145 """Execution statistics/metrics collection.
147 Encompasses concrete implementations over system stats (memory, cpu, cache size)
148 and api calls.
150 The api supports stack nested snapshots, with delta consumption to support
151 tracing metadata annotation across nested subsegments.
152 """
154 def __init__(self, ctx, config=None):
155 self.ctx = ctx
156 self.config = config or {}
158 def push_snapshot(self):
159 """Take a snapshot of the system stats and append to the stack."""
161 def pop_snapshot(self):
162 """Remove a snapshot from the stack and return a delta of the current stats to it.
163 """
164 return {}
166 def get_metadata(self):
167 """Return default of current to last snapshot, without popping.
168 """
169 return {}
171 def __enter__(self):
172 """Push a snapshot
173 """
175 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
176 """Pop a snapshot
177 """
180@sys_stats_outputs.register('psutil', condition=HAVE_PSUTIL)
181class SystemStats(DeltaStats):
182 """Collect process statistics via psutil as deltas over policy execution.
183 """
184 def __init__(self, ctx, config=None):
185 super(SystemStats, self).__init__(ctx, config)
186 self.process = psutil.Process(os.getpid())
188 def __enter__(self):
189 self.push_snapshot()
191 def __exit__(self):
192 self.pop_snapshot()
194 def get_metadata(self):
195 if self.snapshot_stack:
196 return self.delta(self.snapshot_stack[-1], self.get_snapshot())
197 return self.get_snapshot()
199 def get_snapshot(self):
200 snapshot = {
201 'num_threads': self.process.num_threads(),
202 'snapshot_time': time.time(),
203 'cache_size': self.ctx.policy.get_cache().size()
204 }
206 # no num_fds on Windows, but likely num_handles
207 if hasattr(self.process, "num_fds"):
208 snapshot['num_fds'] = self.process.num_fds()
209 elif hasattr(self.process, "num_handles"):
210 snapshot['num_handles'] = self.process.num_handles()
212 with self.process.oneshot():
213 # simpler would be json.dumps(self.process.as_dict()), but
214 # that complicates delta diffing between snapshots.
215 cpu_time = self.process.cpu_times()
216 snapshot['cpu_user'] = cpu_time.user
217 snapshot['cpu_system'] = cpu_time.system
218 (snapshot['num_ctx_switches_voluntary'],
219 snapshot['num_ctx_switches_involuntary']) = self.process.num_ctx_switches()
220 # io counters ( not available on osx)
221 if getattr(self.process, 'io_counters', None):
222 try:
223 io = self.process.io_counters()
224 for counter in (
225 'read_count', 'write_count',
226 'write_bytes', 'read_bytes'):
227 snapshot[counter] = getattr(io, counter)
228 except NotImplementedError:
229 # some old kernels and Windows Linux Subsystem throw this
230 pass
231 # memory counters
232 mem = self.process.memory_info()
233 for counter in (
234 'rss', 'vms', 'shared', 'text', 'data', 'lib',
235 'pfaults', 'pageins'):
236 v = getattr(mem, counter, None)
237 if v is not None:
238 snapshot[counter] = v
239 return snapshot
242class Metrics:
244 permissions = ()
245 namespace = DEFAULT_NAMESPACE
246 BUFFER_SIZE = 20
248 def __init__(self, ctx, config=None):
249 self.ctx = ctx
250 self.config = config
251 self.buf = []
253 def _format_metric(self, key, value, unit, dimensions):
254 raise NotImplementedError("subclass responsiblity")
256 def _put_metrics(self, ns, metrics):
257 raise NotImplementedError("subclass responsiblity")
259 def flush(self):
260 if self.buf:
261 self._put_metrics(self.namespace, self.buf)
262 self.buf = []
264 def put_metric(self, key, value, unit, buffer=True, **dimensions):
265 point = self._format_metric(key, value, unit, dimensions)
266 self.buf.append(point)
267 if buffer:
268 # Max metrics in a single request
269 if len(self.buf) >= self.BUFFER_SIZE:
270 self.flush()
271 else:
272 self.flush()
274 def get_metadata(self):
275 return list(self.buf)
278@metrics_outputs.register('default')
279class LogMetrics(Metrics):
280 """Default metrics collection.
282 logs metrics, default handler should send to stderr
283 """
284 def _put_metrics(self, ns, metrics):
285 for m in metrics:
286 if m['MetricName'] not in ('ActionTime', 'ResourceTime'):
287 log.debug(self.render_metric(m))
289 def render_metric(self, m):
290 label = "metric:%s %s:%s" % (m['MetricName'], m['Unit'], m['Value'])
291 for d in m['Dimensions']:
292 label += " %s:%s" % (d['Name'].lower(), d['Value'].lower())
293 return label
295 def _format_metric(self, key, value, unit, dimensions):
296 d = {
297 "MetricName": key,
298 "Timestamp": datetime.datetime.now(),
299 "Value": value,
300 "Unit": unit}
301 d["Dimensions"] = [
302 {"Name": "Policy", "Value": self.ctx.policy.name},
303 {"Name": "ResType", "Value": self.ctx.policy.resource_type}]
304 for k, v in dimensions.items():
305 d['Dimensions'].append({"Name": k, "Value": v})
306 return d
308 def get_metadata(self):
309 res = []
310 for k in self.buf:
311 k = dict(k)
312 k.pop('Dimensions', None)
313 res.append(k)
314 return res
317class LogOutput:
319 log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
321 def __init__(self, ctx, config=None):
322 self.ctx = ctx
323 self.config = config or {}
324 self.handler = None
326 def get_handler(self):
327 raise NotImplementedError()
329 def __enter__(self):
330 log.debug("Storing output with %s" % repr(self))
331 self.join_log()
332 return self
334 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
335 if exc_type is not None:
336 log.exception("Error while executing policy")
337 self.leave_log()
339 def join_log(self):
340 self.handler = self.get_handler()
341 if self.handler is None:
342 return
343 self.handler.setLevel(logging.DEBUG)
344 self.handler.setFormatter(logging.Formatter(self.log_format))
345 mlog = logging.getLogger('custodian')
346 mlog.addHandler(self.handler)
348 def leave_log(self):
349 if self.handler is None:
350 return
351 mlog = logging.getLogger('custodian')
352 mlog.removeHandler(self.handler)
353 self.handler.flush()
354 self.handler.close()
357@log_outputs.register('default')
358class LogFile(LogOutput):
360 def __repr__(self):
361 return "<LogFile file://%s>" % self.log_path
363 @property
364 def log_path(self):
365 return os.path.join(
366 self.ctx.log_dir, 'custodian-run.log')
368 def get_handler(self):
369 return logging.FileHandler(self.log_path)
372@log_outputs.register('null')
373class NullLog(LogOutput):
374 # default - for unit tests
376 def __repr__(self):
377 return "<Null Log>"
379 @property
380 def log_path(self):
381 return "xyz/log.txt"
383 def get_handler(self):
384 return None
387class OutputFileHandler(ABC):
388 """Base class for types registered with the blob_outputs registry.
390 Provides explicit interface definition for the types.
392 The file handlers are treated as context managers.
393 """
395 type: str # Injected by the register method, matches the string type passed.
396 root_dir: str # The base directory that will hold the output files.
398 @abstractmethod
399 def __enter__(self):
400 raise NotImplementedError()
402 @abstractmethod
403 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
404 raise NotImplementedError()
406 @abstractmethod
407 def write_file(self, rel_path, value):
408 "Write a file at the relative path specified with the value as the content."
409 raise NotImplementedError()
412@blob_outputs.register('null')
413class NullBlobOutput(OutputFileHandler):
414 # default - for unit tests
416 def __init__(self, ctx, config):
417 self.ctx = ctx
418 self.config = config
419 self.root_dir = 'xyz'
421 def __repr__(self):
422 return "<null blob output>"
424 def __enter__(self):
425 "A no-op for the null handler."
427 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
428 "A no-op for the null handler."
430 def write_file(self, rel_path, value):
431 "A no-op for the null handler."
434@blob_outputs.register('file')
435@blob_outputs.register('default')
436class DirectoryOutput(OutputFileHandler):
438 permissions = ()
440 def __init__(self, ctx, config):
441 self.ctx = ctx
442 self.config = config
444 output_path = self.get_output_path(config['url'])
445 if output_path.startswith('file://'):
446 output_path = output_path[len('file://'):]
448 self.root_dir = output_path
449 if self.root_dir and not os.path.exists(self.root_dir):
450 os.makedirs(self.root_dir)
452 def __enter__(self):
453 return
455 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
456 return
458 def __repr__(self):
459 return "<%s to dir:%s>" % (self.__class__.__name__, self.root_dir)
461 def write_file(self, rel_path, value):
462 with open(os.path.join(self.root_dir, rel_path), 'w') as fh:
463 fh.write(value)
465 def compress(self):
466 # Compress files individually so thats easy to walk them, without
467 # downloading tar and extracting.
468 for root, dirs, files in os.walk(self.root_dir):
469 for f in files:
470 fp = os.path.join(root, f)
471 with gzip.open(fp + ".gz", "wb", compresslevel=7) as zfh:
472 with open(fp, "rb") as sfh:
473 shutil.copyfileobj(sfh, zfh, length=2**15)
474 os.remove(fp)
476 def get_output_path(self, output_url):
477 if '{' not in output_url:
478 return os.path.join(output_url, self.ctx.policy.name)
479 return output_url.format(**self.get_output_vars())
481 def get_output_vars(self):
482 data = {
483 'account_id': self.ctx.options.account_id,
484 'region': self.ctx.options.region,
485 'policy_name': self.ctx.policy.name,
486 'now': datetime.datetime.utcnow(),
487 'uuid': str(uuid.uuid4()),
488 }
489 return data
492class BlobOutput(DirectoryOutput):
494 log = logging.getLogger('custodian.output.blob')
496 def __init__(self, ctx, config):
497 self.ctx = ctx
498 # we allow format strings in output urls so reparse config
499 # post interpolation.
500 self.config = parse_url_config(self.get_output_path(config['url']))
501 self.bucket = self.config.netloc
502 self.key_prefix = self.config.path.strip('/')
503 self.root_dir = tempfile.mkdtemp()
505 def __repr__(self):
506 return "<output:%s to bucket:%s prefix:%s>" % (
507 self.type,
508 self.bucket,
509 self.key_prefix,
510 )
512 def get_output_path(self, output_url):
513 if '{' not in output_url:
514 return join_output_path(
515 output_url.strip('/'),
516 self.ctx.policy.name,
517 datetime.datetime.utcnow().strftime('%Y/%m/%d/%H')
518 )
519 return output_url.format(**self.get_output_vars()).rstrip('/')
521 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
522 self.log.debug("%s: uploading policy logs", self.type)
523 self.compress()
524 self.upload()
525 shutil.rmtree(self.root_dir)
526 self.log.debug("%s: policy logs uploaded", self.type)
528 def upload(self):
529 for root, dirs, files in os.walk(self.root_dir):
530 len_root_dir = len(self.root_dir)
531 for f in files:
532 rel_path = root[len_root_dir:]
533 key = "/".join(filter(None, [self.key_prefix, rel_path, f]))
534 self.upload_file(os.path.join(root, f), key)
536 def upload_file(self, path, key):
537 raise NotImplementedError("subclass responsibility")