Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/aws.py: 31%

485 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4from c7n.provider import clouds, Provider 

5 

6from collections import Counter, namedtuple 

7import contextlib 

8import copy 

9import datetime 

10import itertools 

11import logging 

12import os 

13import operator 

14import socket 

15import sys 

16import time 

17import threading 

18import traceback 

19from urllib import parse as urlparse 

20from urllib.request import urlopen, Request 

21from urllib.error import HTTPError, URLError 

22 

23import boto3 

24 

25from botocore.validate import ParamValidator 

26from boto3.s3.transfer import S3Transfer 

27 

28from c7n.credentials import SessionFactory 

29from c7n.config import Bag 

30from c7n.exceptions import InvalidOutputConfig, PolicyValidationError 

31from c7n.log import CloudWatchLogHandler 

32from c7n.utils import parse_url_config, backoff_delays 

33 

34from .resource_map import ResourceMap 

35 

36# Import output registries aws provider extends. 

37from c7n.output import ( 

38 api_stats_outputs, 

39 blob_outputs, 

40 log_outputs, 

41 metrics_outputs, 

42 tracer_outputs 

43) 

44 

45# Output base implementations we extend. 

46from c7n.output import ( 

47 Metrics, 

48 DeltaStats, 

49 BlobOutput, 

50 LogOutput, 

51) 

52 

53from c7n.registry import PluginRegistry 

54from c7n import credentials, utils 

55 

56log = logging.getLogger('custodian.aws') 

57 

58try: 

59 from aws_xray_sdk.core import xray_recorder, patch 

60 from aws_xray_sdk.core.context import Context 

61 HAVE_XRAY = True 

62except ImportError: 

63 HAVE_XRAY = False 

64 class Context: pass # NOQA 

65 

66_profile_session = None 

67 

68 

69DEFAULT_NAMESPACE = "CloudMaid" 

70 

71 

72def get_profile_session(options): 

73 global _profile_session 

74 if _profile_session: 

75 return _profile_session 

76 

77 profile = getattr(options, 'profile', None) 

78 _profile_session = boto3.Session(profile_name=profile) 

79 return _profile_session 

80 

81 

82def _default_region(options): 

83 marker = object() 

84 value = getattr(options, 'regions', marker) 

85 if value is marker: 

86 return 

87 

88 if len(value) > 0: 

89 return 

90 

91 try: 

92 options.regions = [get_profile_session(options).region_name] 

93 except Exception: 

94 log.warning('Could not determine default region') 

95 options.regions = [None] 

96 

97 if options.regions[0] is None: 

98 log.error('No default region set. Specify a default via AWS_DEFAULT_REGION ' 

99 'or setting a region in ~/.aws/config') 

100 sys.exit(1) 

101 

102 log.debug("using default region:%s from boto" % options.regions[0]) 

103 

104 

105def _default_account_id(options): 

106 if options.account_id: 

107 return 

108 elif options.assume_role: 

109 try: 

110 options.account_id = options.assume_role.split(':')[4] 

111 return 

112 except IndexError: 

113 pass 

114 try: 

115 session = get_profile_session(options) 

116 options.account_id = utils.get_account_id_from_sts(session) 

117 except Exception: 

118 options.account_id = None 

119 

120 

121def _default_bucket_region(options): 

122 # modify options to format s3 output urls with explicit region. 

123 if not options.output_dir.startswith('s3://'): 

124 return 

125 

126 parsed = urlparse.urlparse(options.output_dir) 

127 s3_conf = parse_url_config(options.output_dir) 

128 if parsed.query and s3_conf.get("region"): 

129 return 

130 

131 # s3 clients default to us-east-1 if no region is specified, but for partition 

132 # support we default to using a passed in region if given. 

133 region = None 

134 if options.regions: 

135 region = options.regions[0] 

136 

137 # we're operating pre the expansion of symbolic name all into actual regions. 

138 if region == "all": 

139 region = None 

140 

141 try: 

142 options.output_dir = get_bucket_url_with_region(options.output_dir, region) 

143 except ValueError as err: 

144 invalid_output = InvalidOutputConfig(str(err)) 

145 invalid_output.__suppress_context__ = True 

146 raise invalid_output 

147 

148 

149def shape_validate(params, shape_name, service): 

150 session = fake_session()._session 

151 model = session.get_service_model(service) 

152 shape = model.shape_for(shape_name) 

153 validator = ParamValidator() 

154 report = validator.validate(params, shape) 

155 if report.has_errors(): 

156 raise PolicyValidationError(report.generate_report()) 

157 

158 

159def get_bucket_url_with_region(bucket_url, region): 

160 parsed = urlparse.urlparse(bucket_url) 

161 s3_conf = parse_url_config(bucket_url) 

162 params = {} 

163 if region: 

164 params['region_name'] = region 

165 

166 client = boto3.client('s3', **params) 

167 region = inspect_bucket_region(s3_conf.netloc, client.meta.endpoint_url) 

168 if not region: 

169 raise ValueError(f"could not determine region for output bucket, use explicit ?region=region_name. {s3_conf.url}") # noqa 

170 query = f"region={region}" 

171 if parsed.query: 

172 query = parsed.query + f"&region={region}" 

173 parts = parsed._replace( 

174 path=parsed.path.strip("/"), 

175 query=query 

176 ) 

177 return urlparse.urlunparse(parts) 

178 

179 

180def inspect_bucket_region(bucket, s3_endpoint, allow_public=False): 

181 """Attempt to determine a bucket region without a client 

182 

183 We can make an unauthenticated HTTP HEAD request to S3 in an attempt to find a bucket's 

184 region. This avoids some issues with cross-account/cross-region uses of the 

185 GetBucketLocation or HeadBucket API action. Because bucket names are unique within 

186 AWS partitions, we can make requests to a single regional S3 endpoint 

187 and get redirected if a bucket lives in another region within the 

188 same partition. 

189 

190 This approach is inspired by some sample code from a Go SDK issue comment, 

191 which @sean-zou mentioned in #7593: 

192 

193 https://github.com/aws/aws-sdk-go/issues/720#issuecomment-613038544 

194 

195 Return a region string, or None if we're unable to determine one. 

196 """ 

197 region = None 

198 s3_endpoint_parts = urlparse.urlparse(s3_endpoint) 

199 # Use a "path-style" S3 URL here to avoid failing TLS certificate validation 

200 # on buckets with a dot in the name. 

201 # 

202 # According to the following blog post, before deprecating path-style 

203 # URLs AWS will provide a way for virtual-hosted-style URLs to handle 

204 # buckets with dots in their names. Using path-style URLs here in 

205 # the meantime seems reasonable, compared to alternatives like forcing 

206 # HTTP or ignoring certificate validation. 

207 # 

208 # https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/ 

209 bucket_endpoint = f'https://{s3_endpoint_parts.netloc}/{bucket}' 

210 request = Request(bucket_endpoint, method='HEAD') 

211 try: 

212 # For private buckets the head request will always raise an 

213 # http error, the status code and response headers provide 

214 # context for where the bucket is. For public buckets we 

215 # default to raising an exception as unsuitable location at 

216 # least for the output use case. 

217 # 

218 # Dynamic use of urllib trips up static analyzers because of 

219 # the potential to accidentally allow unexpected schemes like 

220 # file:/. Here we're hardcoding the https scheme, so we can 

221 # ignore those specific checks. 

222 # 

223 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa 

224 response = url_socket_retry(urlopen, request) # nosec B310 

225 # Successful response indicates a public accessible bucket in the same region 

226 region = response.headers.get('x-amz-bucket-region') 

227 

228 if not allow_public: 

229 raise ValueError("bucket: '{bucket}' is publicly accessible") 

230 except HTTPError as err: 

231 # Returns 404 'Not Found' for buckets that don't exist 

232 if err.status == 404: 

233 raise ValueError(f"bucket '{bucket}' does not exist") 

234 # Permission errors (403) or redirects (301) for valid buckets 

235 # should still contain a header we can use to determine the 

236 # bucket region. Permission errors are indicative of correct 

237 # region, while redirects are for cross region. 

238 region = err.headers.get('x-amz-bucket-region') 

239 

240 return region 

241 

242 

243def url_socket_retry(func, *args, **kw): 

244 """retry a urllib operation in the event of certain errors. 

245 

246 we want to retry on some common issues for cases where we are 

247 connecting through an intermediary proxy or where the downstream 

248 is overloaded. 

249 

250 socket errors 

251 - 104 - Connection reset by peer 

252 - 110 - Connection timed out 

253 

254 http errors 

255 - 503 - Slow Down | Service Unavailable 

256 """ 

257 min_delay = 1 

258 max_delay = 32 

259 max_attempts = 4 

260 

261 for idx, delay in enumerate( 

262 backoff_delays(min_delay, max_delay, jitter=True)): 

263 try: 

264 return func(*args, **kw) 

265 except HTTPError as err: 

266 if not (err.status == 503 and 'Slow Down' in err.reason): 

267 raise 

268 if idx == max_attempts - 1: 

269 raise 

270 except URLError as err: 

271 if not isinstance(err.reason, socket.error): 

272 raise 

273 if err.reason.errno not in (104, 110): 

274 raise 

275 if idx == max_attempts - 1: 

276 raise 

277 

278 time.sleep(delay) 

279 

280 

281class Arn(namedtuple('_Arn', ( 

282 'arn', 'partition', 'service', 'region', 

283 'account_id', 'resource', 'resource_type', 'separator'))): 

284 

285 __slots__ = () 

286 

287 def __repr__(self): 

288 return "<arn:%s:%s:%s:%s:%s%s%s>" % ( 

289 self.partition, 

290 self.service, 

291 self.region, 

292 self.account_id, 

293 self.resource_type, 

294 self.separator, 

295 self.resource) 

296 

297 @classmethod 

298 def parse(cls, arn): 

299 if isinstance(arn, Arn): 

300 return arn 

301 parts = arn.split(':', 5) 

302 if len(parts) < 3: 

303 raise ValueError("Invalid Arn") 

304 # a few resources use qualifiers without specifying type 

305 if parts[2] in ('s3', 'apigateway', 'execute-api', 'emr-serverless'): 

306 parts.append(None) 

307 parts.append(None) 

308 elif '/' in parts[-1]: 

309 parts.extend(reversed(parts.pop(-1).split('/', 1))) 

310 parts.append('/') 

311 elif ':' in parts[-1]: 

312 parts.extend(reversed(parts.pop(-1).split(':', 1))) 

313 parts.append(':') 

314 elif len(parts) == 6: 

315 parts.append('') 

316 parts.append('') 

317 # replace the literal 'arn' string with raw arn 

318 parts[0] = arn 

319 return cls(*parts) 

320 

321 

322class ArnResolver: 

323 

324 def __init__(self, manager): 

325 self.manager = manager 

326 

327 def resolve(self, arns): 

328 arns = map(Arn.parse, arns) 

329 a_service = operator.attrgetter('service') 

330 a_resource = operator.attrgetter('resource_type') 

331 kfunc = lambda a: (a_service(a), a_resource(a)) # noqa 

332 arns = sorted(arns, key=kfunc) 

333 results = {} 

334 for (service, arn_type), arn_set in itertools.groupby(arns, key=kfunc): 

335 arn_set = list(arn_set) 

336 rtype = ArnResolver.resolve_type(arn_set[0]) 

337 rmanager = self.manager.get_resource_manager(rtype) 

338 if rtype == 'sns': 

339 resources = rmanager.get_resources( 

340 [rarn.arn for rarn in arn_set]) 

341 else: 

342 resources = rmanager.get_resources( 

343 [rarn.resource for rarn in arn_set]) 

344 for rarn, r in zip(rmanager.get_arns(resources), resources): 

345 results[rarn] = r 

346 

347 for rarn in arn_set: 

348 if rarn.arn not in results: 

349 results[rarn.arn] = None 

350 return results 

351 

352 @staticmethod 

353 def resolve_type(arn): 

354 arn = Arn.parse(arn) 

355 

356 # this would benefit from a class cache {service} -> rtypes 

357 for type_name, klass in AWS.resources.items(): 

358 if type_name in ('rest-account', 'account') or klass.resource_type.arn is False: 

359 continue 

360 if arn.service != (klass.resource_type.arn_service or klass.resource_type.service): 

361 continue 

362 if (type_name in ('asg', 'ecs-task') and 

363 "%s%s" % (klass.resource_type.arn_type, klass.resource_type.arn_separator) 

364 in arn.resource_type): 

365 return type_name 

366 elif (klass.resource_type.arn_type is not None and 

367 klass.resource_type.arn_type == arn.resource_type): 

368 return type_name 

369 elif (klass.resource_type.arn_service == arn.service and 

370 klass.resource_type.arn_type == ""): 

371 return type_name 

372 

373 

374@metrics_outputs.register('aws') 

375class MetricsOutput(Metrics): 

376 """Send metrics data to cloudwatch 

377 """ 

378 

379 permissions = ("cloudWatch:PutMetricData",) 

380 retry = staticmethod(utils.get_retry(('Throttling',))) 

381 

382 def __init__(self, ctx, config=None): 

383 super(MetricsOutput, self).__init__(ctx, config) 

384 self.namespace = self.config.get('namespace', DEFAULT_NAMESPACE) 

385 self.region = self.config.get('region') 

386 self.ignore_zero = self.config.get('ignore_zero') 

387 am = self.config.get('active_metrics') 

388 self.active_metrics = am and am.split(',') 

389 self.destination = ( 

390 self.config.scheme == 'aws' and 

391 self.config.get('netloc') == 'master') and 'master' or None 

392 

393 def _format_metric(self, key, value, unit, dimensions): 

394 d = { 

395 "MetricName": key, 

396 "Timestamp": datetime.datetime.utcnow(), 

397 "Value": value, 

398 "Unit": unit} 

399 d["Dimensions"] = [ 

400 {"Name": "Policy", "Value": self.ctx.policy.name}, 

401 {"Name": "ResType", "Value": self.ctx.policy.resource_type}] 

402 for k, v in dimensions.items(): 

403 # Skip legacy static dimensions if using new capabilities 

404 if (self.destination or self.region) and k == 'Scope': 

405 continue 

406 d['Dimensions'].append({"Name": k, "Value": v}) 

407 if self.region: 

408 d['Dimensions'].append( 

409 {'Name': 'Region', 'Value': self.ctx.options.region}) 

410 if self.destination: 

411 d['Dimensions'].append( 

412 {'Name': 'Account', 'Value': self.ctx.options.account_id or ''}) 

413 return d 

414 

415 def _put_metrics(self, ns, metrics): 

416 if self.destination == 'master': 

417 watch = self.ctx.session_factory( 

418 assume=False).client('cloudwatch', region_name=self.region) 

419 else: 

420 watch = utils.local_session( 

421 self.ctx.session_factory).client('cloudwatch', region_name=self.region) 

422 

423 # NOTE filter out value is 0 metrics data 

424 if self.ignore_zero in ['1', 'true', 'True']: 

425 metrics = [m for m in metrics if m.get("Value") != 0] 

426 # NOTE filter metrics data by the metric name configured 

427 if self.active_metrics: 

428 metrics = [m for m in metrics if m["MetricName"] in self.active_metrics] 

429 if not metrics: 

430 return 

431 return self.retry( 

432 watch.put_metric_data, Namespace=ns, MetricData=metrics) 

433 

434 

435@log_outputs.register('aws') 

436class CloudWatchLogOutput(LogOutput): 

437 

438 log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s' 

439 

440 def __init__(self, ctx, config=None): 

441 super(CloudWatchLogOutput, self).__init__(ctx, config) 

442 if self.config['netloc'] == 'master' or not self.config['netloc']: 

443 self.log_group = self.config['path'].strip('/') 

444 else: 

445 # join netloc to path for casual usages of aws://log/group/name 

446 self.log_group = ("%s/%s" % ( 

447 self.config['netloc'], self.config['path'].strip('/'))).strip('/') 

448 self.region = self.config.get('region', ctx.options.region) 

449 self.destination = ( 

450 self.config.scheme == 'aws' and 

451 self.config.get('netloc') == 'master') and 'master' or None 

452 

453 def construct_stream_name(self): 

454 if self.config.get('stream') is None: 

455 log_stream = self.ctx.policy.name 

456 if self.config.get('region') is not None: 

457 log_stream = "{}/{}".format(self.ctx.options.region, log_stream) 

458 if self.config.get('netloc') == 'master': 

459 log_stream = "{}/{}".format(self.ctx.options.account_id, log_stream) 

460 else: 

461 log_stream = self.config.get('stream').format( 

462 region=self.ctx.options.region, 

463 account=self.ctx.options.account_id, 

464 policy=self.ctx.policy.name, 

465 now=datetime.datetime.utcnow()) 

466 return log_stream 

467 

468 def get_handler(self): 

469 log_stream = self.construct_stream_name() 

470 params = dict( 

471 log_group=self.log_group, log_stream=log_stream, 

472 session_factory=( 

473 lambda x=None: self.ctx.session_factory( 

474 region=self.region, assume=self.destination != 'master'))) 

475 return CloudWatchLogHandler(**params) 

476 

477 def __repr__(self): 

478 return "<%s to group:%s stream:%s>" % ( 

479 self.__class__.__name__, 

480 self.ctx.options.log_group, 

481 self.ctx.policy.name) 

482 

483 

484class XrayEmitter: 

485 # implement https://github.com/aws/aws-xray-sdk-python/issues/51 

486 

487 def __init__(self): 

488 self.buf = [] 

489 self.client = None 

490 

491 def send_entity(self, entity): 

492 self.buf.append(entity) 

493 if len(self.buf) > 49: 

494 self.flush() 

495 

496 def flush(self): 

497 buf = self.buf 

498 self.buf = [] 

499 for segment_set in utils.chunks(buf, 50): 

500 self.client.put_trace_segments( 

501 TraceSegmentDocuments=[s.serialize() for s in segment_set]) 

502 

503 

504class XrayContext(Context): 

505 """Specialized XRay Context for Custodian. 

506 

507 A context is used as a segment storage stack for currently in 

508 progress segments. 

509 

510 We use a customized context for custodian as policy execution 

511 commonly uses a concurrent.futures threadpool pattern during 

512 execution for api concurrency. Default xray semantics would use 

513 thread local storage and treat each of those as separate trace 

514 executions. We want to aggregate/associate all thread pool api 

515 executions to the custoidan policy execution. XRay sdk supports 

516 this via manual code for every thread pool usage, but we don't 

517 want to explicitly couple xray integration everywhere across the 

518 codebase. Instead we use a context that is aware of custodian 

519 usage of threads and associates subsegments therein to the policy 

520 execution active subsegment. 

521 """ 

522 

523 def __init__(self, *args, **kw): 

524 super(XrayContext, self).__init__(*args, **kw) 

525 self._local = Bag() 

526 self._current_subsegment = None 

527 self._main_tid = threading.get_ident() 

528 

529 def handle_context_missing(self): 

530 """Custodian has a few api calls out of band of policy execution. 

531 

532 - Resolving account alias. 

533 - Cloudwatch Log group/stream discovery/creation (when using -l on cli) 

534 

535 Also we want to folks to optionally based on configuration using xray 

536 so default to disabling context missing output. 

537 """ 

538 

539 # Annotate any segments/subsegments with their thread ids. 

540 def put_segment(self, segment): 

541 if getattr(segment, 'thread_id', None) is None: 

542 segment.thread_id = threading.get_ident() 

543 super().put_segment(segment) 

544 

545 def put_subsegment(self, subsegment): 

546 if getattr(subsegment, 'thread_id', None) is None: 

547 subsegment.thread_id = threading.get_ident() 

548 super().put_subsegment(subsegment) 

549 

550 # Override since we're not just popping the end of the stack, we're removing 

551 # the thread subsegment from the array by identity. 

552 def end_subsegment(self, end_time): 

553 subsegment = self.get_trace_entity() 

554 if self._is_subsegment(subsegment): 

555 subsegment.close(end_time) 

556 self._local.entities.remove(subsegment) 

557 return True 

558 else: 

559 log.warning("No subsegment to end.") 

560 return False 

561 

562 # Override get trace identity, any worker thread will find its own subsegment 

563 # on the stack, else will use the main thread's sub/segment 

564 def get_trace_entity(self): 

565 tid = threading.get_ident() 

566 entities = self._local.get('entities', ()) 

567 for s in reversed(entities): 

568 if s.thread_id == tid: 

569 return s 

570 # custodian main thread won't advance (create new segment) 

571 # with worker threads still doing pool work. 

572 elif s.thread_id == self._main_tid: 

573 return s 

574 return self.handle_context_missing() 

575 

576 

577@tracer_outputs.register('xray', condition=HAVE_XRAY) 

578class XrayTracer: 

579 

580 emitter = XrayEmitter() 

581 

582 in_lambda = 'LAMBDA_TASK_ROOT' in os.environ 

583 use_daemon = 'AWS_XRAY_DAEMON_ADDRESS' in os.environ 

584 service_name = 'custodian' 

585 

586 @classmethod 

587 def initialize(cls, config): 

588 context = XrayContext() 

589 sampling = config.get('sample', 'true') == 'true' and True or False 

590 xray_recorder.configure( 

591 emitter=cls.use_daemon is False and cls.emitter or None, 

592 context=context, 

593 sampling=sampling, 

594 context_missing='LOG_ERROR') 

595 patch(['boto3', 'requests']) 

596 logging.getLogger('aws_xray_sdk.core').setLevel(logging.ERROR) 

597 

598 def __init__(self, ctx, config): 

599 self.ctx = ctx 

600 self.config = config or {} 

601 self.client = None 

602 self.metadata = {} 

603 

604 @contextlib.contextmanager 

605 def subsegment(self, name): 

606 segment = xray_recorder.begin_subsegment(name) 

607 try: 

608 yield segment 

609 except Exception as e: 

610 stack = traceback.extract_stack(limit=xray_recorder.max_trace_back) 

611 segment.add_exception(e, stack) 

612 raise 

613 finally: 

614 xray_recorder.end_subsegment(time.time()) 

615 

616 def __enter__(self): 

617 if self.client is None: 

618 self.client = self.ctx.session_factory(assume=False).client('xray') 

619 

620 self.emitter.client = self.client 

621 

622 if self.in_lambda: 

623 self.segment = xray_recorder.begin_subsegment(self.service_name) 

624 else: 

625 self.segment = xray_recorder.begin_segment( 

626 self.service_name, sampling=True) 

627 

628 p = self.ctx.policy 

629 xray_recorder.put_annotation('policy', p.name) 

630 xray_recorder.put_annotation('resource', p.resource_type) 

631 if self.ctx.options.account_id: 

632 xray_recorder.put_annotation('account', self.ctx.options.account_id) 

633 

634 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

635 metadata = self.ctx.get_metadata(('api-stats',)) 

636 metadata.update(self.metadata) 

637 xray_recorder.put_metadata('custodian', metadata) 

638 if self.in_lambda: 

639 xray_recorder.end_subsegment() 

640 return 

641 xray_recorder.end_segment() 

642 if not self.use_daemon: 

643 self.emitter.flush() 

644 log.info( 

645 ('View XRay Trace https://console.aws.amazon.com/xray/home?region=%s#/' 

646 'traces/%s' % (self.ctx.options.region, self.segment.trace_id))) 

647 self.metadata.clear() 

648 

649 

650@api_stats_outputs.register('aws') 

651class ApiStats(DeltaStats): 

652 

653 def __init__(self, ctx, config=None): 

654 super(ApiStats, self).__init__(ctx, config) 

655 self.api_calls = Counter() 

656 

657 def get_snapshot(self): 

658 return dict(self.api_calls) 

659 

660 def get_metadata(self): 

661 return self.get_snapshot() 

662 

663 def __enter__(self): 

664 if isinstance(self.ctx.session_factory, credentials.SessionFactory): 

665 self.ctx.session_factory.set_subscribers((self,)) 

666 self.push_snapshot() 

667 

668 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

669 if isinstance(self.ctx.session_factory, credentials.SessionFactory): 

670 self.ctx.session_factory.set_subscribers(()) 

671 

672 # With cached sessions, we need to unregister any events subscribers 

673 # on extant sessions to allow for the next registration. 

674 utils.local_session(self.ctx.session_factory).events.unregister( 

675 'after-call.*.*', self._record, unique_id='c7n-api-stats') 

676 

677 self.ctx.metrics.put_metric( 

678 "ApiCalls", sum(self.api_calls.values()), "Count") 

679 self.pop_snapshot() 

680 

681 def __call__(self, s): 

682 s.events.register( 

683 'after-call.*.*', self._record, unique_id='c7n-api-stats') 

684 

685 def _record(self, http_response, parsed, model, **kwargs): 

686 self.api_calls["%s.%s" % ( 

687 model.service_model.endpoint_prefix, model.name)] += 1 

688 

689 

690@blob_outputs.register('s3') 

691class S3Output(BlobOutput): 

692 """ 

693 Usage: 

694 

695 .. code-block:: python 

696 

697 with S3Output(session_factory, 's3://bucket/prefix'): 

698 log.info('xyz') # -> log messages sent to custodian-run.log.gz 

699 

700 """ 

701 

702 permissions = ('S3:PutObject',) 

703 

704 def __init__(self, ctx, config): 

705 super().__init__(ctx, config) 

706 self._transfer = None 

707 

708 @property 

709 def transfer(self): 

710 if self._transfer: 

711 return self._transfer 

712 bucket_region = self.config.region or None 

713 self._transfer = S3Transfer( 

714 self.ctx.session_factory(region=bucket_region, assume=False).client('s3')) 

715 return self._transfer 

716 

717 def upload_file(self, path, key): 

718 self.transfer.upload_file( 

719 path, self.bucket, key, 

720 extra_args={ 

721 'ACL': 'bucket-owner-full-control', 

722 'ServerSideEncryption': 'AES256'}) 

723 

724 

725@clouds.register('aws') 

726class AWS(Provider): 

727 

728 display_name = 'AWS' 

729 resource_prefix = 'aws' 

730 # legacy path for older plugins 

731 resources = PluginRegistry('resources') 

732 # import paths for resources 

733 resource_map = ResourceMap 

734 

735 def initialize(self, options): 

736 """ 

737 """ 

738 _default_region(options) 

739 _default_account_id(options) 

740 _default_bucket_region(options) 

741 

742 if options.tracer and options.tracer.startswith('xray') and HAVE_XRAY: 

743 XrayTracer.initialize(utils.parse_url_config(options.tracer)) 

744 return options 

745 

746 def get_session_factory(self, options): 

747 return SessionFactory( 

748 options.region, 

749 options.profile, 

750 options.assume_role, 

751 options.external_id) 

752 

753 def initialize_policies(self, policy_collection, options): 

754 """Return a set of policies targetted to the given regions. 

755 

756 Supports symbolic regions like 'all'. This will automatically 

757 filter out policies if they are being targetted to a region that 

758 does not support the service. Global services will target a 

759 single region (us-east-1 if only all specified, else first 

760 region in the list). 

761 

762 Note for region partitions (govcloud and china) an explicit 

763 region from the partition must be passed in. 

764 """ 

765 from c7n.policy import Policy, PolicyCollection 

766 policies = [] 

767 service_region_map, resource_service_map = get_service_region_map( 

768 options.regions, policy_collection.resource_types, self.type) 

769 if 'all' in options.regions: 

770 enabled_regions = { 

771 r['RegionName'] for r in 

772 get_profile_session(options).client('ec2').describe_regions( 

773 Filters=[{'Name': 'opt-in-status', 

774 'Values': ['opt-in-not-required', 'opted-in']}] 

775 ).get('Regions')} 

776 for p in policy_collection: 

777 if 'aws.' in p.resource_type: 

778 _, resource_type = p.resource_type.split('.', 1) 

779 else: 

780 resource_type = p.resource_type 

781 available_regions = service_region_map.get( 

782 resource_service_map.get(resource_type), ()) 

783 

784 # its a global service/endpoint, use user provided region 

785 # or us-east-1. 

786 if not available_regions and options.regions: 

787 candidates = [r for r in options.regions if r != 'all'] 

788 candidate = candidates and candidates[0] or 'us-east-1' 

789 svc_regions = [candidate] 

790 elif 'all' in options.regions: 

791 svc_regions = list(set(available_regions).intersection(enabled_regions)) 

792 else: 

793 svc_regions = options.regions 

794 

795 for region in svc_regions: 

796 if available_regions and region not in available_regions: 

797 level = ('all' in options.regions and 

798 logging.DEBUG or logging.WARNING) 

799 # TODO: fixme 

800 policy_collection.log.log( 

801 level, "policy:%s resources:%s not available in region:%s", 

802 p.name, p.resource_type, region) 

803 continue 

804 options_copy = copy.copy(options) 

805 options_copy.region = str(region) 

806 

807 if len(options.regions) > 1 or 'all' in options.regions and getattr( 

808 options, 'output_dir', None): 

809 options_copy.output_dir = join_output(options.output_dir, region) 

810 policies.append( 

811 Policy(p.data, options_copy, 

812 session_factory=policy_collection.session_factory())) 

813 

814 return PolicyCollection( 

815 # order policies by region to minimize local session invalidation. 

816 # note relative ordering of policies must be preserved, python sort 

817 # is stable. 

818 sorted(policies, key=operator.attrgetter('options.region')), 

819 options) 

820 

821 

822def join_output(output_dir, suffix): 

823 if '{region}' in output_dir: 

824 return output_dir.rstrip('/') 

825 if output_dir.endswith('://'): 

826 return output_dir + suffix 

827 output_url_parts = urlparse.urlparse(output_dir) 

828 # for output urls, the end of the url may be a 

829 # query string. make sure we add a suffix to 

830 # the path component. 

831 output_url_parts = output_url_parts._replace( 

832 path = output_url_parts.path.rstrip('/') + '/%s' % suffix 

833 ) 

834 return urlparse.urlunparse(output_url_parts) 

835 

836 

837def fake_session(): 

838 session = boto3.Session( # nosec nosemgrep 

839 region_name='us-east-1', 

840 aws_access_key_id='never', 

841 aws_secret_access_key='found') 

842 return session 

843 

844 

845def get_service_region_map(regions, resource_types, provider='aws'): 

846 # we're not interacting with the apis just using the sdk meta information. 

847 

848 session = fake_session() 

849 normalized_types = [] 

850 for r in resource_types: 

851 if r.startswith('%s.' % provider): 

852 normalized_types.append(r[len(provider) + 1:]) 

853 else: 

854 normalized_types.append(r) 

855 resource_service_map = { 

856 r: clouds[provider].resources.get(r).resource_type.service 

857 for r in normalized_types if r != 'account'} 

858 # support for govcloud, china, and iso. We only utilize these regions if they 

859 # are explicitly passed in on the cli. 

860 partition_regions = {} 

861 for p in ('aws-cn', 'aws-us-gov', 'aws-iso'): 

862 for r in session.get_available_regions('s3', partition_name=p): 

863 partition_regions[r] = p 

864 

865 partitions = ['aws'] 

866 for r in regions: 

867 if r in partition_regions: 

868 partitions.append(partition_regions[r]) 

869 

870 service_region_map = {} 

871 for s in set(itertools.chain(resource_service_map.values())): 

872 for partition in partitions: 

873 service_region_map.setdefault(s, []).extend( 

874 session.get_available_regions(s, partition_name=partition)) 

875 return service_region_map, resource_service_map