Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/resources/aws.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

515 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4from c7n.provider import clouds, Provider 

5 

6from collections import Counter, namedtuple 

7import contextlib 

8import copy 

9import datetime 

10import itertools 

11import logging 

12import os 

13import operator 

14import socket 

15import sys 

16import time 

17import threading 

18import traceback 

19from urllib import parse as urlparse 

20from urllib.request import urlopen, Request 

21from urllib.error import HTTPError, URLError 

22 

23import boto3 

24 

25from botocore.validate import ParamValidator 

26from boto3.s3.transfer import S3Transfer 

27 

28from c7n.credentials import SessionFactory 

29from c7n.config import Bag 

30from c7n.exceptions import InvalidOutputConfig, PolicyValidationError 

31from c7n.log import CloudWatchLogHandler 

32from c7n.utils import parse_url_config, backoff_delays 

33 

34from .resource_map import ResourceMap 

35 

36# Import output registries aws provider extends. 

37from c7n.output import ( 

38 api_stats_outputs, 

39 blob_outputs, 

40 log_outputs, 

41 metrics_outputs, 

42 tracer_outputs 

43) 

44 

45# Output base implementations we extend. 

46from c7n.output import ( 

47 Metrics, 

48 DeltaStats, 

49 BlobOutput, 

50 LogOutput, 

51) 

52 

53from c7n.registry import PluginRegistry 

54from c7n import credentials, utils 

55 

56log = logging.getLogger('custodian.aws') 

57 

58try: 

59 from aws_xray_sdk.core import xray_recorder, patch 

60 from aws_xray_sdk.core.context import Context 

61 HAVE_XRAY = True 

62except ImportError: 

63 HAVE_XRAY = False 

64 class Context: pass # NOQA 

65 

66_profile_session = None 

67 

68 

69DEFAULT_NAMESPACE = "CloudMaid" 

70# Mapping of service model shape types to json schema types 

71MODEL_SCHEMA_TYPE_MAP = { 

72 'string': 'string', 

73 'structure': 'object', 

74 'list': 'array', 

75 'integer': 'integer', 

76 'boolean': 'boolean', 

77 'long': 'number', 

78 'double': 'number', 

79 'map': 'object' 

80} 

81 

82 

83def get_profile_session(options): 

84 global _profile_session 

85 if _profile_session: 

86 return _profile_session 

87 

88 profile = getattr(options, 'profile', None) 

89 _profile_session = boto3.Session(profile_name=profile) 

90 return _profile_session 

91 

92 

93def _default_region(options): 

94 marker = object() 

95 value = getattr(options, 'regions', marker) 

96 if value is marker: 

97 return 

98 

99 if len(value) > 0: 

100 return 

101 

102 try: 

103 options.regions = [get_profile_session(options).region_name] 

104 except Exception: 

105 log.warning('Could not determine default region') 

106 options.regions = [None] 

107 

108 if options.regions[0] is None: 

109 log.error('No default region set. Specify a default via AWS_DEFAULT_REGION ' 

110 'or setting a region in ~/.aws/config') 

111 sys.exit(1) 

112 

113 log.debug("using default region:%s from boto" % options.regions[0]) 

114 

115 

116def _default_account_id(options): 

117 if options.account_id: 

118 return 

119 elif options.assume_role: 

120 try: 

121 options.account_id = options.assume_role.split(':')[4] 

122 return 

123 except IndexError: 

124 pass 

125 try: 

126 session = get_profile_session(options) 

127 options.account_id = utils.get_account_id_from_sts(session) 

128 except Exception: 

129 options.account_id = None 

130 

131 

132def _default_bucket_region(options): 

133 # modify options to format s3 output urls with explicit region. 

134 if not options.output_dir.startswith('s3://'): 

135 return 

136 

137 parsed = urlparse.urlparse(options.output_dir) 

138 s3_conf = parse_url_config(options.output_dir) 

139 if parsed.query and s3_conf.get("region"): 

140 return 

141 

142 # s3 clients default to us-east-1 if no region is specified, but for partition 

143 # support we default to using a passed in region if given. 

144 region = None 

145 if options.regions: 

146 region = options.regions[0] 

147 

148 # we're operating pre the expansion of symbolic name all into actual regions. 

149 if region == "all": 

150 region = None 

151 

152 try: 

153 options.output_dir = get_bucket_url_with_region(options.output_dir, region) 

154 except ValueError as err: 

155 invalid_output = InvalidOutputConfig(str(err)) 

156 invalid_output.__suppress_context__ = True 

157 raise invalid_output 

158 

159 

160def shape_validate(params, shape_name, service): 

161 session = fake_session()._session 

162 model = session.get_service_model(service) 

163 shape = model.shape_for(shape_name) 

164 validator = ParamValidator() 

165 report = validator.validate(params, shape) 

166 if report.has_errors(): 

167 raise PolicyValidationError(report.generate_report()) 

168 

169 

170def get_bucket_url_with_region(bucket_url, region): 

171 parsed = urlparse.urlparse(bucket_url) 

172 s3_conf = parse_url_config(bucket_url) 

173 params = {} 

174 if region: 

175 params['region_name'] = region 

176 

177 client = boto3.client('s3', **params) 

178 region = inspect_bucket_region(s3_conf.netloc, client.meta.endpoint_url) 

179 if not region: 

180 raise ValueError(f"could not determine region for output bucket, use explicit ?region=region_name. {s3_conf.url}") # noqa 

181 query = f"region={region}" 

182 if parsed.query: 

183 query = parsed.query + f"&region={region}" 

184 parts = parsed._replace( 

185 path=parsed.path.strip("/"), 

186 query=query 

187 ) 

188 return urlparse.urlunparse(parts) 

189 

190 

191def inspect_bucket_region(bucket, s3_endpoint, allow_public=False): 

192 """Attempt to determine a bucket region without a client 

193 

194 We can make an unauthenticated HTTP HEAD request to S3 in an attempt to find a bucket's 

195 region. This avoids some issues with cross-account/cross-region uses of the 

196 GetBucketLocation or HeadBucket API action. Because bucket names are unique within 

197 AWS partitions, we can make requests to a single regional S3 endpoint 

198 and get redirected if a bucket lives in another region within the 

199 same partition. 

200 

201 This approach is inspired by some sample code from a Go SDK issue comment, 

202 which @sean-zou mentioned in #7593: 

203 

204 https://github.com/aws/aws-sdk-go/issues/720#issuecomment-613038544 

205 

206 Return a region string, or None if we're unable to determine one. 

207 """ 

208 region = None 

209 s3_endpoint_parts = urlparse.urlparse(s3_endpoint) 

210 # Use a "path-style" S3 URL here to avoid failing TLS certificate validation 

211 # on buckets with a dot in the name. 

212 # 

213 # According to the following blog post, before deprecating path-style 

214 # URLs AWS will provide a way for virtual-hosted-style URLs to handle 

215 # buckets with dots in their names. Using path-style URLs here in 

216 # the meantime seems reasonable, compared to alternatives like forcing 

217 # HTTP or ignoring certificate validation. 

218 # 

219 # https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/ 

220 bucket_endpoint = f'https://{s3_endpoint_parts.netloc}/{bucket}' 

221 request = Request(bucket_endpoint, method='HEAD') 

222 try: 

223 # For private buckets the head request will always raise an 

224 # http error, the status code and response headers provide 

225 # context for where the bucket is. For public buckets we 

226 # default to raising an exception as unsuitable location at 

227 # least for the output use case. 

228 # 

229 # Dynamic use of urllib trips up static analyzers because of 

230 # the potential to accidentally allow unexpected schemes like 

231 # file:/. Here we're hardcoding the https scheme, so we can 

232 # ignore those specific checks. 

233 # 

234 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa 

235 response = url_socket_retry(urlopen, request) # nosec B310 

236 # Successful response indicates a public accessible bucket in the same region 

237 region = response.headers.get('x-amz-bucket-region') 

238 

239 if not allow_public: 

240 raise ValueError("bucket: '{bucket}' is publicly accessible") 

241 except HTTPError as err: 

242 # Returns 404 'Not Found' for buckets that don't exist 

243 if err.status == 404: 

244 raise ValueError(f"bucket '{bucket}' does not exist") 

245 # Permission errors (403) or redirects (301) for valid buckets 

246 # should still contain a header we can use to determine the 

247 # bucket region. Permission errors are indicative of correct 

248 # region, while redirects are for cross region. 

249 region = err.headers.get('x-amz-bucket-region') 

250 

251 return region 

252 

253 

254def url_socket_retry(func, *args, **kw): 

255 """retry a urllib operation in the event of certain errors. 

256 

257 we want to retry on some common issues for cases where we are 

258 connecting through an intermediary proxy or where the downstream 

259 is overloaded. 

260 

261 socket errors 

262 - 104 - Connection reset by peer 

263 - 110 - Connection timed out 

264 

265 http errors 

266 - 503 - Slow Down | Service Unavailable 

267 """ 

268 min_delay = 1 

269 max_delay = 32 

270 max_attempts = 4 

271 

272 for idx, delay in enumerate( 

273 backoff_delays(min_delay, max_delay, jitter=True)): 

274 try: 

275 return func(*args, **kw) 

276 except HTTPError as err: 

277 if not (err.status == 503 and 'Slow Down' in err.reason): 

278 raise 

279 if idx == max_attempts - 1: 

280 raise 

281 except URLError as err: 

282 if not isinstance(err.reason, socket.error): 

283 raise 

284 if err.reason.errno not in (104, 110): 

285 raise 

286 if idx == max_attempts - 1: 

287 raise 

288 

289 time.sleep(delay) 

290 

291 

292class Arn(namedtuple('_Arn', ( 

293 'arn', 'partition', 'service', 'region', 

294 'account_id', 'resource', 'resource_type', 'separator'))): 

295 

296 __slots__ = () 

297 

298 def __repr__(self): 

299 return "<arn:%s:%s:%s:%s:%s%s%s>" % ( 

300 self.partition, 

301 self.service, 

302 self.region, 

303 self.account_id, 

304 self.resource_type, 

305 self.separator, 

306 self.resource) 

307 

308 @classmethod 

309 def parse(cls, arn): 

310 if isinstance(arn, Arn): 

311 return arn 

312 parts = arn.split(':', 5) 

313 if len(parts) < 3: 

314 raise ValueError("Invalid Arn") 

315 # a few resources use qualifiers without specifying type 

316 if parts[2] in ('s3', 'apigateway', 'execute-api', 'emr-serverless'): 

317 parts.append(None) 

318 parts.append(None) 

319 elif '/' in parts[-1]: 

320 parts.extend(reversed(parts.pop(-1).split('/', 1))) 

321 parts.append('/') 

322 elif ':' in parts[-1]: 

323 parts.extend(reversed(parts.pop(-1).split(':', 1))) 

324 parts.append(':') 

325 elif len(parts) == 6: 

326 parts.append('') 

327 parts.append('') 

328 # replace the literal 'arn' string with raw arn 

329 parts[0] = arn 

330 return cls(*parts) 

331 

332 

333class ArnResolver: 

334 

335 def __init__(self, manager): 

336 self.manager = manager 

337 

338 def resolve(self, arns): 

339 arns = map(Arn.parse, arns) 

340 a_service = operator.attrgetter('service') 

341 a_resource = operator.attrgetter('resource_type') 

342 kfunc = lambda a: (a_service(a), a_resource(a)) # noqa 

343 arns = sorted(arns, key=kfunc) 

344 results = {} 

345 for (service, arn_type), arn_set in itertools.groupby(arns, key=kfunc): 

346 arn_set = list(arn_set) 

347 rtype = ArnResolver.resolve_type(arn_set[0]) 

348 rmanager = self.manager.get_resource_manager(rtype) 

349 if rtype == 'sns': 

350 resources = rmanager.get_resources( 

351 [rarn.arn for rarn in arn_set]) 

352 else: 

353 resources = rmanager.get_resources( 

354 [rarn.resource for rarn in arn_set]) 

355 for rarn, r in zip(rmanager.get_arns(resources), resources): 

356 results[rarn] = r 

357 

358 for rarn in arn_set: 

359 if rarn.arn not in results: 

360 results[rarn.arn] = None 

361 return results 

362 

363 @staticmethod 

364 def resolve_type(arn): 

365 arn = Arn.parse(arn) 

366 

367 # this would benefit from a class cache {service} -> rtypes 

368 for type_name, klass in AWS.resources.items(): 

369 if type_name in ('rest-account', 'account') or klass.resource_type.arn is False: 

370 continue 

371 if arn.service != (klass.resource_type.arn_service or klass.resource_type.service): 

372 continue 

373 if (type_name in ('asg', 'ecs-task') and 

374 "%s%s" % (klass.resource_type.arn_type, klass.resource_type.arn_separator) 

375 in arn.resource_type): 

376 return type_name 

377 elif (klass.resource_type.arn_type is not None and 

378 klass.resource_type.arn_type == arn.resource_type): 

379 return type_name 

380 elif (klass.resource_type.arn_service == arn.service and 

381 klass.resource_type.arn_type == ""): 

382 return type_name 

383 

384 

385@metrics_outputs.register('aws') 

386class MetricsOutput(Metrics): 

387 """Send metrics data to cloudwatch 

388 """ 

389 

390 permissions = ("cloudWatch:PutMetricData",) 

391 retry = staticmethod(utils.get_retry(('Throttling',))) 

392 

393 def __init__(self, ctx, config=None): 

394 super(MetricsOutput, self).__init__(ctx, config) 

395 self.namespace = self.config.get('namespace', DEFAULT_NAMESPACE) 

396 self.region = self.config.get('region') 

397 self.ignore_zero = self.config.get('ignore_zero') 

398 am = self.config.get('active_metrics') 

399 self.active_metrics = am and am.split(',') 

400 self.destination = ( 

401 self.config.scheme == 'aws' and 

402 self.config.get('netloc') == 'master') and 'master' or None 

403 

404 def _format_metric(self, key, value, unit, dimensions): 

405 d = { 

406 "MetricName": key, 

407 "Timestamp": datetime.datetime.utcnow(), 

408 "Value": value, 

409 "Unit": unit} 

410 d["Dimensions"] = [ 

411 {"Name": "Policy", "Value": self.ctx.policy.name}, 

412 {"Name": "ResType", "Value": self.ctx.policy.resource_type}] 

413 for k, v in dimensions.items(): 

414 # Skip legacy static dimensions if using new capabilities 

415 if (self.destination or self.region) and k == 'Scope': 

416 continue 

417 d['Dimensions'].append({"Name": k, "Value": v}) 

418 if self.region: 

419 d['Dimensions'].append( 

420 {'Name': 'Region', 'Value': self.ctx.options.region}) 

421 if self.destination: 

422 d['Dimensions'].append( 

423 {'Name': 'Account', 'Value': self.ctx.options.account_id or ''}) 

424 return d 

425 

426 def _put_metrics(self, ns, metrics): 

427 if self.destination == 'master': 

428 watch = self.ctx.session_factory( 

429 assume=False).client('cloudwatch', region_name=self.region) 

430 else: 

431 watch = utils.local_session( 

432 self.ctx.session_factory).client('cloudwatch', region_name=self.region) 

433 

434 # NOTE filter out value is 0 metrics data 

435 if self.ignore_zero in ['1', 'true', 'True']: 

436 metrics = [m for m in metrics if m.get("Value") != 0] 

437 # NOTE filter metrics data by the metric name configured 

438 if self.active_metrics: 

439 metrics = [m for m in metrics if m["MetricName"] in self.active_metrics] 

440 if not metrics: 

441 return 

442 return self.retry( 

443 watch.put_metric_data, Namespace=ns, MetricData=metrics) 

444 

445 

446@log_outputs.register('aws') 

447class CloudWatchLogOutput(LogOutput): 

448 

449 log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s' 

450 

451 def __init__(self, ctx, config=None): 

452 super(CloudWatchLogOutput, self).__init__(ctx, config) 

453 if self.config['netloc'] == 'master' or not self.config['netloc']: 

454 self.log_group = self.config['path'].strip('/') 

455 else: 

456 # join netloc to path for casual usages of aws://log/group/name 

457 self.log_group = ("%s/%s" % ( 

458 self.config['netloc'], self.config['path'].strip('/'))).strip('/') 

459 self.region = self.config.get('region', ctx.options.region) 

460 self.destination = ( 

461 self.config.scheme == 'aws' and 

462 self.config.get('netloc') == 'master') and 'master' or None 

463 

464 def construct_stream_name(self): 

465 if self.config.get('stream') is None: 

466 log_stream = self.ctx.policy.name 

467 if self.config.get('region') is not None: 

468 log_stream = "{}/{}".format(self.ctx.options.region, log_stream) 

469 if self.config.get('netloc') == 'master': 

470 log_stream = "{}/{}".format(self.ctx.options.account_id, log_stream) 

471 else: 

472 log_stream = self.config.get('stream').format( 

473 region=self.ctx.options.region, 

474 account=self.ctx.options.account_id, 

475 policy=self.ctx.policy.name, 

476 now=datetime.datetime.utcnow()) 

477 return log_stream 

478 

479 def get_handler(self): 

480 log_stream = self.construct_stream_name() 

481 params = dict( 

482 log_group=self.log_group, log_stream=log_stream, 

483 session_factory=( 

484 lambda x=None: self.ctx.session_factory( 

485 region=self.region, assume=self.destination != 'master'))) 

486 return CloudWatchLogHandler(**params) 

487 

488 def __repr__(self): 

489 return "<%s to group:%s stream:%s>" % ( 

490 self.__class__.__name__, 

491 self.ctx.options.log_group, 

492 self.ctx.policy.name) 

493 

494 

495class XrayEmitter: 

496 # implement https://github.com/aws/aws-xray-sdk-python/issues/51 

497 

498 def __init__(self): 

499 self.buf = [] 

500 self.client = None 

501 

502 def send_entity(self, entity): 

503 self.buf.append(entity) 

504 if len(self.buf) > 49: 

505 self.flush() 

506 

507 def flush(self): 

508 buf = self.buf 

509 self.buf = [] 

510 for segment_set in utils.chunks(buf, 50): 

511 self.client.put_trace_segments( 

512 TraceSegmentDocuments=[s.serialize() for s in segment_set]) 

513 

514 

515class XrayContext(Context): 

516 """Specialized XRay Context for Custodian. 

517 

518 A context is used as a segment storage stack for currently in 

519 progress segments. 

520 

521 We use a customized context for custodian as policy execution 

522 commonly uses a concurrent.futures threadpool pattern during 

523 execution for api concurrency. Default xray semantics would use 

524 thread local storage and treat each of those as separate trace 

525 executions. We want to aggregate/associate all thread pool api 

526 executions to the custoidan policy execution. XRay sdk supports 

527 this via manual code for every thread pool usage, but we don't 

528 want to explicitly couple xray integration everywhere across the 

529 codebase. Instead we use a context that is aware of custodian 

530 usage of threads and associates subsegments therein to the policy 

531 execution active subsegment. 

532 """ 

533 

534 def __init__(self, *args, **kw): 

535 super(XrayContext, self).__init__(*args, **kw) 

536 self._local = Bag() 

537 self._current_subsegment = None 

538 self._main_tid = threading.get_ident() 

539 

540 def handle_context_missing(self): 

541 """Custodian has a few api calls out of band of policy execution. 

542 

543 - Resolving account alias. 

544 - Cloudwatch Log group/stream discovery/creation (when using -l on cli) 

545 

546 Also we want to folks to optionally based on configuration using xray 

547 so default to disabling context missing output. 

548 """ 

549 

550 # Annotate any segments/subsegments with their thread ids. 

551 def put_segment(self, segment): 

552 if getattr(segment, 'thread_id', None) is None: 

553 segment.thread_id = threading.get_ident() 

554 super().put_segment(segment) 

555 

556 def put_subsegment(self, subsegment): 

557 if getattr(subsegment, 'thread_id', None) is None: 

558 subsegment.thread_id = threading.get_ident() 

559 super().put_subsegment(subsegment) 

560 

561 # Override since we're not just popping the end of the stack, we're removing 

562 # the thread subsegment from the array by identity. 

563 def end_subsegment(self, end_time): 

564 subsegment = self.get_trace_entity() 

565 if self._is_subsegment(subsegment): 

566 subsegment.close(end_time) 

567 self._local.entities.remove(subsegment) 

568 return True 

569 else: 

570 log.warning("No subsegment to end.") 

571 return False 

572 

573 # Override get trace identity, any worker thread will find its own subsegment 

574 # on the stack, else will use the main thread's sub/segment 

575 def get_trace_entity(self): 

576 tid = threading.get_ident() 

577 entities = self._local.get('entities', ()) 

578 for s in reversed(entities): 

579 if s.thread_id == tid: 

580 return s 

581 # custodian main thread won't advance (create new segment) 

582 # with worker threads still doing pool work. 

583 elif s.thread_id == self._main_tid: 

584 return s 

585 return self.handle_context_missing() 

586 

587 

588@tracer_outputs.register('xray', condition=HAVE_XRAY) 

589class XrayTracer: 

590 

591 emitter = XrayEmitter() 

592 

593 in_lambda = 'LAMBDA_TASK_ROOT' in os.environ 

594 use_daemon = 'AWS_XRAY_DAEMON_ADDRESS' in os.environ 

595 service_name = 'custodian' 

596 

597 @classmethod 

598 def initialize(cls, config): 

599 context = XrayContext() 

600 sampling = config.get('sample', 'true') == 'true' and True or False 

601 xray_recorder.configure( 

602 emitter=cls.use_daemon is False and cls.emitter or None, 

603 context=context, 

604 sampling=sampling, 

605 context_missing='LOG_ERROR') 

606 patch(['boto3', 'requests']) 

607 logging.getLogger('aws_xray_sdk.core').setLevel(logging.ERROR) 

608 

609 def __init__(self, ctx, config): 

610 self.ctx = ctx 

611 self.config = config or {} 

612 self.client = None 

613 self.metadata = {} 

614 

615 @contextlib.contextmanager 

616 def subsegment(self, name): 

617 segment = xray_recorder.begin_subsegment(name) 

618 try: 

619 yield segment 

620 except Exception as e: 

621 stack = traceback.extract_stack(limit=xray_recorder.max_trace_back) 

622 segment.add_exception(e, stack) 

623 raise 

624 finally: 

625 xray_recorder.end_subsegment(time.time()) 

626 

627 def __enter__(self): 

628 if self.client is None: 

629 self.client = self.ctx.session_factory(assume=False).client('xray') 

630 

631 self.emitter.client = self.client 

632 

633 if self.in_lambda: 

634 self.segment = xray_recorder.begin_subsegment(self.service_name) 

635 else: 

636 self.segment = xray_recorder.begin_segment( 

637 self.service_name, sampling=True) 

638 

639 p = self.ctx.policy 

640 xray_recorder.put_annotation('policy', p.name) 

641 xray_recorder.put_annotation('resource', p.resource_type) 

642 if self.ctx.options.account_id: 

643 xray_recorder.put_annotation('account', self.ctx.options.account_id) 

644 

645 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

646 metadata = self.ctx.get_metadata(('api-stats',)) 

647 metadata.update(self.metadata) 

648 xray_recorder.put_metadata('custodian', metadata) 

649 if self.in_lambda: 

650 xray_recorder.end_subsegment() 

651 return 

652 xray_recorder.end_segment() 

653 if not self.use_daemon: 

654 self.emitter.flush() 

655 log.info( 

656 ('View XRay Trace https://console.aws.amazon.com/xray/home?region=%s#/' 

657 'traces/%s' % (self.ctx.options.region, self.segment.trace_id))) 

658 self.metadata.clear() 

659 

660 

661@api_stats_outputs.register('aws') 

662class ApiStats(DeltaStats): 

663 

664 def __init__(self, ctx, config=None): 

665 super(ApiStats, self).__init__(ctx, config) 

666 self.api_calls = Counter() 

667 

668 def get_snapshot(self): 

669 return dict(self.api_calls) 

670 

671 def get_metadata(self): 

672 return self.get_snapshot() 

673 

674 def __enter__(self): 

675 if isinstance(self.ctx.session_factory, credentials.SessionFactory): 

676 self.ctx.session_factory.set_subscribers((self,)) 

677 self.push_snapshot() 

678 

679 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None): 

680 if isinstance(self.ctx.session_factory, credentials.SessionFactory): 

681 self.ctx.session_factory.set_subscribers(()) 

682 

683 # With cached sessions, we need to unregister any events subscribers 

684 # on extant sessions to allow for the next registration. 

685 utils.local_session(self.ctx.session_factory).events.unregister( 

686 'after-call.*.*', self._record, unique_id='c7n-api-stats') 

687 

688 self.ctx.metrics.put_metric( 

689 "ApiCalls", sum(self.api_calls.values()), "Count") 

690 self.pop_snapshot() 

691 

692 def __call__(self, s): 

693 s.events.register( 

694 'after-call.*.*', self._record, unique_id='c7n-api-stats') 

695 

696 def _record(self, http_response, parsed, model, **kwargs): 

697 self.api_calls["%s.%s" % ( 

698 model.service_model.endpoint_prefix, model.name)] += 1 

699 

700 

701@blob_outputs.register('s3') 

702class S3Output(BlobOutput): 

703 """ 

704 Usage: 

705 

706 .. code-block:: python 

707 

708 with S3Output(session_factory, 's3://bucket/prefix'): 

709 log.info('xyz') # -> log messages sent to custodian-run.log.gz 

710 

711 """ 

712 

713 permissions = ('S3:PutObject',) 

714 

715 def __init__(self, ctx, config): 

716 super().__init__(ctx, config) 

717 self._transfer = None 

718 

719 @property 

720 def transfer(self): 

721 if self._transfer: 

722 return self._transfer 

723 bucket_region = self.config.region or None 

724 self._transfer = S3Transfer( 

725 self.ctx.session_factory(region=bucket_region, assume=False).client('s3')) 

726 return self._transfer 

727 

728 def upload_file(self, path, key): 

729 self.transfer.upload_file( 

730 path, self.bucket, key, 

731 extra_args={ 

732 'ACL': 'bucket-owner-full-control', 

733 'ServerSideEncryption': 'AES256'}) 

734 

735 

736@clouds.register('aws') 

737class AWS(Provider): 

738 

739 display_name = 'AWS' 

740 resource_prefix = 'aws' 

741 # legacy path for older plugins 

742 resources = PluginRegistry('resources') 

743 # import paths for resources 

744 resource_map = ResourceMap 

745 

746 def initialize(self, options): 

747 """ 

748 """ 

749 _default_region(options) 

750 _default_account_id(options) 

751 _default_bucket_region(options) 

752 

753 if options.tracer and options.tracer.startswith('xray') and HAVE_XRAY: 

754 XrayTracer.initialize(utils.parse_url_config(options.tracer)) 

755 return options 

756 

757 def get_session_factory(self, options): 

758 return SessionFactory( 

759 options.region, 

760 options.profile, 

761 options.assume_role, 

762 options.external_id, 

763 options.session_policy) 

764 

765 def initialize_policies(self, policy_collection, options): 

766 """Return a set of policies targetted to the given regions. 

767 

768 Supports symbolic regions like 'all'. This will automatically 

769 filter out policies if they are being targetted to a region that 

770 does not support the service. Global services will target a 

771 single region (us-east-1 if only all specified, else first 

772 region in the list). 

773 

774 Note for region partitions (govcloud and china) an explicit 

775 region from the partition must be passed in. 

776 """ 

777 from c7n.policy import Policy, PolicyCollection 

778 policies = [] 

779 service_region_map, resource_service_map = get_service_region_map( 

780 options.regions, policy_collection.resource_types, self.type) 

781 if 'all' in options.regions: 

782 enabled_regions = { 

783 r['RegionName'] for r in 

784 get_profile_session(options).client('ec2').describe_regions( 

785 Filters=[{'Name': 'opt-in-status', 

786 'Values': ['opt-in-not-required', 'opted-in']}] 

787 ).get('Regions')} 

788 for p in policy_collection: 

789 if 'aws.' in p.resource_type: 

790 _, resource_type = p.resource_type.split('.', 1) 

791 else: 

792 resource_type = p.resource_type 

793 available_regions = service_region_map.get( 

794 resource_service_map.get(resource_type), ()) 

795 

796 # its a global service/endpoint, use user provided region 

797 # or us-east-1. 

798 if not available_regions and options.regions: 

799 candidates = [r for r in options.regions if r != 'all'] 

800 candidate = candidates and candidates[0] or 'us-east-1' 

801 svc_regions = [candidate] 

802 elif 'all' in options.regions: 

803 svc_regions = list(set(available_regions).intersection(enabled_regions)) 

804 else: 

805 svc_regions = options.regions 

806 

807 for region in svc_regions: 

808 if available_regions and region not in available_regions: 

809 level = ('all' in options.regions and 

810 logging.DEBUG or logging.WARNING) 

811 # TODO: fixme 

812 policy_collection.log.log( 

813 level, "policy:%s resources:%s not available in region:%s", 

814 p.name, p.resource_type, region) 

815 continue 

816 options_copy = copy.copy(options) 

817 options_copy.region = str(region) 

818 

819 if len(options.regions) > 1 or 'all' in options.regions and getattr( 

820 options, 'output_dir', None): 

821 options_copy.output_dir = join_output(options.output_dir, region) 

822 policies.append( 

823 Policy(p.data, options_copy, 

824 session_factory=policy_collection.session_factory())) 

825 

826 return PolicyCollection( 

827 # order policies by region to minimize local session invalidation. 

828 # note relative ordering of policies must be preserved, python sort 

829 # is stable. 

830 sorted(policies, key=operator.attrgetter('options.region')), 

831 options) 

832 

833 

834def join_output(output_dir, suffix): 

835 if '{region}' in output_dir: 

836 return output_dir.rstrip('/') 

837 if output_dir.endswith('://'): 

838 return output_dir + suffix 

839 output_url_parts = urlparse.urlparse(output_dir) 

840 # for output urls, the end of the url may be a 

841 # query string. make sure we add a suffix to 

842 # the path component. 

843 output_url_parts = output_url_parts._replace( 

844 path=output_url_parts.path.rstrip('/') + '/%s' % suffix 

845 ) 

846 return urlparse.urlunparse(output_url_parts) 

847 

848 

849def fake_session(): 

850 session = boto3.Session( # nosec nosemgrep 

851 region_name='us-east-1', 

852 aws_access_key_id='never', 

853 aws_secret_access_key='found') 

854 return session 

855 

856 

857def get_service_region_map(regions, resource_types, provider='aws'): 

858 # we're not interacting with the apis just using the sdk meta information. 

859 

860 session = fake_session() 

861 normalized_types = [] 

862 for r in resource_types: 

863 if r.startswith('%s.' % provider): 

864 normalized_types.append(r[len(provider) + 1:]) 

865 else: 

866 normalized_types.append(r) 

867 resource_service_map = { 

868 r: clouds[provider].resources.get(r).resource_type.service 

869 for r in normalized_types if r != 'account'} 

870 # support for govcloud, china, and iso. We only utilize these regions if they 

871 # are explicitly passed in on the cli. 

872 partition_regions = {} 

873 for p in ('aws-cn', 'aws-us-gov', 'aws-iso'): 

874 for r in session.get_available_regions('s3', partition_name=p): 

875 partition_regions[r] = p 

876 

877 partitions = ['aws'] 

878 for r in regions: 

879 if r in partition_regions: 

880 partitions.append(partition_regions[r]) 

881 

882 service_region_map = {} 

883 for s in set(itertools.chain(resource_service_map.values())): 

884 for partition in partitions: 

885 service_region_map.setdefault(s, []).extend( 

886 session.get_available_regions(s, partition_name=partition)) 

887 return service_region_map, resource_service_map 

888 

889 

890def shape_schema(service, shape_name, drop_fields=()): 

891 """Expand a shape's schema using service model shape data 

892 

893 Repurpose some of the shape discovery/validation logic in 

894 c7n.resources.aws.shape_validate() to dynamically expand 

895 element schema using the latest service model shape information. 

896 

897 Include available properties, their types, and enumerations of 

898 possible values where available. 

899 

900 Args: 

901 service (str): The AWS service for the element. (required) 

902 shape_name (str): The service model request shape name. (required) 

903 drop_fields (Tuple[str]): List of fields to drop from the schema 

904 (e.g. resource_id param). 

905 """ 

906 

907 def _expand_shape_schema(shape): 

908 schema = {} 

909 for member, member_shape in shape.members.items(): 

910 if member in drop_fields: 

911 continue 

912 _type = MODEL_SCHEMA_TYPE_MAP.get(member_shape.type_name) 

913 if _type is None: 

914 raise KeyError(f"Unknown type for {member_shape.name}: {member_shape.type_name}") 

915 member_schema = {'type': _type} 

916 if enum := getattr(member_shape, 'enum', None): 

917 member_schema['enum'] = enum 

918 if member_shape.type_name == 'structure': 

919 member_schema["properties"] = _expand_shape_schema(member_shape) 

920 elif member_shape.type_name == 'list': 

921 if member_shape.member.type_name == 'structure': 

922 member_schema["items"] = { 

923 'type': 'object', 

924 'properties': _expand_shape_schema(member_shape.member) 

925 } 

926 else: 

927 member_schema["items"] = { 

928 'type': MODEL_SCHEMA_TYPE_MAP.get(member_shape.member.type_name) 

929 } 

930 elif member_shape.type_name == 'map': 

931 if member_shape.value.type_name in ('structure', 'list'): 

932 member_schema["patternProperties"] = { 

933 "^.+$": _expand_shape_schema(member_shape.value) 

934 } 

935 else: 

936 member_schema["patternProperties"] = { 

937 "^.+$": { 

938 'type': MODEL_SCHEMA_TYPE_MAP.get(member_shape.value.type_name) 

939 } 

940 } 

941 

942 schema[member] = member_schema 

943 return schema 

944 

945 session = fake_session()._session 

946 model = session.get_service_model(service) 

947 shape = model.shape_for(shape_name) 

948 

949 return _expand_shape_schema(shape)