Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/output.py: 43%

314 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4Outputs metrics, logs, stats, traces, and structured records across 

5a variety of sinks. 

6 

7See docs/usage/outputs.rst 

8 

9""" 

10import contextlib 

11import datetime 

12import gzip 

13import logging 

14import os 

15import shutil 

16import tempfile 

17import time 

18import uuid 

19 

20from abc import ABC, abstractmethod 

21 

22from c7n.exceptions import InvalidOutputConfig 

23from c7n.registry import PluginRegistry 

24from c7n.utils import parse_url_config, join_output_path 

25 

26try: 

27 import psutil 

28 

29 HAVE_PSUTIL = True 

30except ImportError: 

31 HAVE_PSUTIL = False 

32 

33log = logging.getLogger('custodian.output') 

34 

35 

36# TODO remove 

37DEFAULT_NAMESPACE = "CloudMaid" 

38 

39 

40class OutputRegistry(PluginRegistry): 

41 

42 default_protocol = None 

43 

44 def select(self, selector, ctx): 

45 if not selector: 

46 return self['default'](ctx, {'url': selector}) 

47 if '://' not in selector and selector in self: 

48 selector = "{}://".format(selector) 

49 elif self.default_protocol and '://' not in selector: 

50 selector = "{}://{}".format( 

51 self.default_protocol, selector) 

52 for k in self.keys(): 

53 if selector.startswith(k): 

54 return self[k](ctx, parse_url_config(selector)) 

55 raise InvalidOutputConfig("Invalid %s: %s" % ( 

56 self.plugin_type, 

57 selector)) 

58 

59 

60class BlobOutputRegistry(OutputRegistry): 

61 

62 default_protocol = "file" 

63 

64 

65class LogOutputRegistry(OutputRegistry): 

66 

67 default_protocol = "aws" 

68 

69 

70class MetricsRegistry(OutputRegistry): 

71 

72 def select(self, selector, ctx): 

73 # Compatibility for boolean configuration 

74 if isinstance(selector, bool) and selector: 

75 selector = 'aws' 

76 return super(MetricsRegistry, self).select(selector, ctx) 

77 

78 

79api_stats_outputs = OutputRegistry('c7n.output.api_stats') 

80blob_outputs = BlobOutputRegistry('c7n.output.blob') 

81log_outputs = LogOutputRegistry('c7n.output.logs') 

82metrics_outputs = MetricsRegistry('c7n.output.metrics') 

83tracer_outputs = OutputRegistry('c7n.output.tracer') 

84sys_stats_outputs = OutputRegistry('c7n.output.sys_stats') 

85 

86 

87@tracer_outputs.register('default') 

88class NullTracer: 

89 """Tracing provides for detailed analytics of a policy execution. 

90 

91 Uses native cloud provider integration (xray, stack driver trace). 

92 """ 

93 def __init__(self, ctx, config=None): 

94 self.ctx = ctx 

95 self.config = config or {} 

96 

97 @contextlib.contextmanager 

98 def subsegment(self, name): 

99 """Create a named subsegment as a context manager 

100 """ 

101 yield self 

102 

103 def __enter__(self): 

104 """Enter main segment for policy execution. 

105 """ 

106 

107 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

108 """Exit main segment for policy execution. 

109 """ 

110 

111 

112class DeltaStats: 

113 """Capture stats (dictionary of string->integer) as a stack. 

114 

115 Popping the stack automatically creates a delta of the last 

116 stack element to the current stats. 

117 """ 

118 def __init__(self, ctx, config=None): 

119 self.ctx = ctx 

120 self.config = config or {} 

121 self.snapshot_stack = [] 

122 

123 def push_snapshot(self): 

124 self.snapshot_stack.append(self.get_snapshot()) 

125 

126 def pop_snapshot(self): 

127 return self.delta( 

128 self.snapshot_stack.pop(), self.get_snapshot()) 

129 

130 def get_snapshot(self): 

131 return {} 

132 

133 def delta(self, before, after): 

134 delta = {} 

135 for k in before: 

136 val = after[k] - before[k] 

137 if val: 

138 delta[k] = val 

139 return delta 

140 

141 

142@sys_stats_outputs.register('default') 

143@api_stats_outputs.register('default') 

144class NullStats: 

145 """Execution statistics/metrics collection. 

146 

147 Encompasses concrete implementations over system stats (memory, cpu, cache size) 

148 and api calls. 

149 

150 The api supports stack nested snapshots, with delta consumption to support 

151 tracing metadata annotation across nested subsegments. 

152 """ 

153 

154 def __init__(self, ctx, config=None): 

155 self.ctx = ctx 

156 self.config = config or {} 

157 

158 def push_snapshot(self): 

159 """Take a snapshot of the system stats and append to the stack.""" 

160 

161 def pop_snapshot(self): 

162 """Remove a snapshot from the stack and return a delta of the current stats to it. 

163 """ 

164 return {} 

165 

166 def get_metadata(self): 

167 """Return default of current to last snapshot, without popping. 

168 """ 

169 return {} 

170 

171 def __enter__(self): 

172 """Push a snapshot 

173 """ 

174 

175 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

176 """Pop a snapshot 

177 """ 

178 

179 

180@sys_stats_outputs.register('psutil', condition=HAVE_PSUTIL) 

181class SystemStats(DeltaStats): 

182 """Collect process statistics via psutil as deltas over policy execution. 

183 """ 

184 def __init__(self, ctx, config=None): 

185 super(SystemStats, self).__init__(ctx, config) 

186 self.process = psutil.Process(os.getpid()) 

187 

188 def __enter__(self): 

189 self.push_snapshot() 

190 

191 def __exit__(self): 

192 self.pop_snapshot() 

193 

194 def get_metadata(self): 

195 if self.snapshot_stack: 

196 return self.delta(self.snapshot_stack[-1], self.get_snapshot()) 

197 return self.get_snapshot() 

198 

199 def get_snapshot(self): 

200 snapshot = { 

201 'num_threads': self.process.num_threads(), 

202 'snapshot_time': time.time(), 

203 'cache_size': self.ctx.policy.get_cache().size() 

204 } 

205 

206 # no num_fds on Windows, but likely num_handles 

207 if hasattr(self.process, "num_fds"): 

208 snapshot['num_fds'] = self.process.num_fds() 

209 elif hasattr(self.process, "num_handles"): 

210 snapshot['num_handles'] = self.process.num_handles() 

211 

212 with self.process.oneshot(): 

213 # simpler would be json.dumps(self.process.as_dict()), but 

214 # that complicates delta diffing between snapshots. 

215 cpu_time = self.process.cpu_times() 

216 snapshot['cpu_user'] = cpu_time.user 

217 snapshot['cpu_system'] = cpu_time.system 

218 (snapshot['num_ctx_switches_voluntary'], 

219 snapshot['num_ctx_switches_involuntary']) = self.process.num_ctx_switches() 

220 # io counters ( not available on osx) 

221 if getattr(self.process, 'io_counters', None): 

222 try: 

223 io = self.process.io_counters() 

224 for counter in ( 

225 'read_count', 'write_count', 

226 'write_bytes', 'read_bytes'): 

227 snapshot[counter] = getattr(io, counter) 

228 except NotImplementedError: 

229 # some old kernels and Windows Linux Subsystem throw this 

230 pass 

231 # memory counters 

232 mem = self.process.memory_info() 

233 for counter in ( 

234 'rss', 'vms', 'shared', 'text', 'data', 'lib', 

235 'pfaults', 'pageins'): 

236 v = getattr(mem, counter, None) 

237 if v is not None: 

238 snapshot[counter] = v 

239 return snapshot 

240 

241 

242class Metrics: 

243 

244 permissions = () 

245 namespace = DEFAULT_NAMESPACE 

246 BUFFER_SIZE = 20 

247 

248 def __init__(self, ctx, config=None): 

249 self.ctx = ctx 

250 self.config = config 

251 self.buf = [] 

252 

253 def _format_metric(self, key, value, unit, dimensions): 

254 raise NotImplementedError("subclass responsiblity") 

255 

256 def _put_metrics(self, ns, metrics): 

257 raise NotImplementedError("subclass responsiblity") 

258 

259 def flush(self): 

260 if self.buf: 

261 self._put_metrics(self.namespace, self.buf) 

262 self.buf = [] 

263 

264 def put_metric(self, key, value, unit, buffer=True, **dimensions): 

265 point = self._format_metric(key, value, unit, dimensions) 

266 self.buf.append(point) 

267 if buffer: 

268 # Max metrics in a single request 

269 if len(self.buf) >= self.BUFFER_SIZE: 

270 self.flush() 

271 else: 

272 self.flush() 

273 

274 def get_metadata(self): 

275 return list(self.buf) 

276 

277 

278@metrics_outputs.register('default') 

279class LogMetrics(Metrics): 

280 """Default metrics collection. 

281 

282 logs metrics, default handler should send to stderr 

283 """ 

284 def _put_metrics(self, ns, metrics): 

285 for m in metrics: 

286 if m['MetricName'] not in ('ActionTime', 'ResourceTime'): 

287 log.debug(self.render_metric(m)) 

288 

289 def render_metric(self, m): 

290 label = "metric:%s %s:%s" % (m['MetricName'], m['Unit'], m['Value']) 

291 for d in m['Dimensions']: 

292 label += " %s:%s" % (d['Name'].lower(), d['Value'].lower()) 

293 return label 

294 

295 def _format_metric(self, key, value, unit, dimensions): 

296 d = { 

297 "MetricName": key, 

298 "Timestamp": datetime.datetime.now(), 

299 "Value": value, 

300 "Unit": unit} 

301 d["Dimensions"] = [ 

302 {"Name": "Policy", "Value": self.ctx.policy.name}, 

303 {"Name": "ResType", "Value": self.ctx.policy.resource_type}] 

304 for k, v in dimensions.items(): 

305 d['Dimensions'].append({"Name": k, "Value": v}) 

306 return d 

307 

308 def get_metadata(self): 

309 res = [] 

310 for k in self.buf: 

311 k = dict(k) 

312 k.pop('Dimensions', None) 

313 res.append(k) 

314 return res 

315 

316 

317class LogOutput: 

318 

319 log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' 

320 

321 def __init__(self, ctx, config=None): 

322 self.ctx = ctx 

323 self.config = config or {} 

324 self.handler = None 

325 

326 def get_handler(self): 

327 raise NotImplementedError() 

328 

329 def __enter__(self): 

330 log.debug("Storing output with %s" % repr(self)) 

331 self.join_log() 

332 return self 

333 

334 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

335 if exc_type is not None: 

336 log.exception("Error while executing policy") 

337 self.leave_log() 

338 

339 def join_log(self): 

340 self.handler = self.get_handler() 

341 if self.handler is None: 

342 return 

343 self.handler.setLevel(logging.DEBUG) 

344 self.handler.setFormatter(logging.Formatter(self.log_format)) 

345 mlog = logging.getLogger('custodian') 

346 mlog.addHandler(self.handler) 

347 

348 def leave_log(self): 

349 if self.handler is None: 

350 return 

351 mlog = logging.getLogger('custodian') 

352 mlog.removeHandler(self.handler) 

353 self.handler.flush() 

354 self.handler.close() 

355 

356 

357@log_outputs.register('default') 

358class LogFile(LogOutput): 

359 

360 def __repr__(self): 

361 return "<LogFile file://%s>" % self.log_path 

362 

363 @property 

364 def log_path(self): 

365 return os.path.join( 

366 self.ctx.log_dir, 'custodian-run.log') 

367 

368 def get_handler(self): 

369 return logging.FileHandler(self.log_path) 

370 

371 

372@log_outputs.register('null') 

373class NullLog(LogOutput): 

374 # default - for unit tests 

375 

376 def __repr__(self): 

377 return "<Null Log>" 

378 

379 @property 

380 def log_path(self): 

381 return "xyz/log.txt" 

382 

383 def get_handler(self): 

384 return None 

385 

386 

387class OutputFileHandler(ABC): 

388 """Base class for types registered with the blob_outputs registry. 

389 

390 Provides explicit interface definition for the types. 

391 

392 The file handlers are treated as context managers. 

393 """ 

394 

395 type: str # Injected by the register method, matches the string type passed. 

396 root_dir: str # The base directory that will hold the output files. 

397 

398 @abstractmethod 

399 def __enter__(self): 

400 raise NotImplementedError() 

401 

402 @abstractmethod 

403 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

404 raise NotImplementedError() 

405 

406 @abstractmethod 

407 def write_file(self, rel_path, value): 

408 "Write a file at the relative path specified with the value as the content." 

409 raise NotImplementedError() 

410 

411 

412@blob_outputs.register('null') 

413class NullBlobOutput(OutputFileHandler): 

414 # default - for unit tests 

415 

416 def __init__(self, ctx, config): 

417 self.ctx = ctx 

418 self.config = config 

419 self.root_dir = 'xyz' 

420 

421 def __repr__(self): 

422 return "<null blob output>" 

423 

424 def __enter__(self): 

425 "A no-op for the null handler." 

426 

427 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

428 "A no-op for the null handler." 

429 

430 def write_file(self, rel_path, value): 

431 "A no-op for the null handler." 

432 

433 

434@blob_outputs.register('file') 

435@blob_outputs.register('default') 

436class DirectoryOutput(OutputFileHandler): 

437 

438 permissions = () 

439 

440 def __init__(self, ctx, config): 

441 self.ctx = ctx 

442 self.config = config 

443 

444 output_path = self.get_output_path(config['url']) 

445 if output_path.startswith('file://'): 

446 output_path = output_path[len('file://'):] 

447 

448 self.root_dir = output_path 

449 if self.root_dir and not os.path.exists(self.root_dir): 

450 os.makedirs(self.root_dir) 

451 

452 def __enter__(self): 

453 return 

454 

455 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

456 return 

457 

458 def __repr__(self): 

459 return "<%s to dir:%s>" % (self.__class__.__name__, self.root_dir) 

460 

461 def write_file(self, rel_path, value): 

462 with open(os.path.join(self.root_dir, rel_path), 'w') as fh: 

463 fh.write(value) 

464 

465 def compress(self): 

466 # Compress files individually so thats easy to walk them, without 

467 # downloading tar and extracting. 

468 for root, dirs, files in os.walk(self.root_dir): 

469 for f in files: 

470 fp = os.path.join(root, f) 

471 with gzip.open(fp + ".gz", "wb", compresslevel=7) as zfh: 

472 with open(fp, "rb") as sfh: 

473 shutil.copyfileobj(sfh, zfh, length=2**15) 

474 os.remove(fp) 

475 

476 def get_output_path(self, output_url): 

477 if '{' not in output_url: 

478 return os.path.join(output_url, self.ctx.policy.name) 

479 return output_url.format(**self.get_output_vars()) 

480 

481 def get_output_vars(self): 

482 data = { 

483 'account_id': self.ctx.options.account_id, 

484 'region': self.ctx.options.region, 

485 'policy_name': self.ctx.policy.name, 

486 'now': datetime.datetime.utcnow(), 

487 'uuid': str(uuid.uuid4()), 

488 } 

489 return data 

490 

491 

492class BlobOutput(DirectoryOutput): 

493 

494 log = logging.getLogger('custodian.output.blob') 

495 

496 def __init__(self, ctx, config): 

497 self.ctx = ctx 

498 # we allow format strings in output urls so reparse config 

499 # post interpolation. 

500 self.config = parse_url_config(self.get_output_path(config['url'])) 

501 self.bucket = self.config.netloc 

502 self.key_prefix = self.config.path.strip('/') 

503 self.root_dir = tempfile.mkdtemp() 

504 

505 def __repr__(self): 

506 return "<output:%s to bucket:%s prefix:%s>" % ( 

507 self.type, 

508 self.bucket, 

509 self.key_prefix, 

510 ) 

511 

512 def get_output_path(self, output_url): 

513 if '{' not in output_url: 

514 return join_output_path( 

515 output_url.strip('/'), 

516 self.ctx.policy.name, 

517 datetime.datetime.utcnow().strftime('%Y/%m/%d/%H') 

518 ) 

519 return output_url.format(**self.get_output_vars()).rstrip('/') 

520 

521 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

522 self.log.debug("%s: uploading policy logs", self.type) 

523 self.compress() 

524 self.upload() 

525 shutil.rmtree(self.root_dir) 

526 self.log.debug("%s: policy logs uploaded", self.type) 

527 

528 def upload(self): 

529 for root, dirs, files in os.walk(self.root_dir): 

530 len_root_dir = len(self.root_dir) 

531 for f in files: 

532 rel_path = root[len_root_dir:] 

533 key = "/".join(filter(None, [self.key_prefix, rel_path, f])) 

534 self.upload_file(os.path.join(root, f), key) 

535 

536 def upload_file(self, path, key): 

537 raise NotImplementedError("subclass responsibility")