1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3
4from c7n.provider import clouds, Provider
5
6from collections import Counter, namedtuple
7import contextlib
8import copy
9import datetime
10import itertools
11import logging
12import os
13import operator
14import socket
15import sys
16import time
17import threading
18import traceback
19from urllib import parse as urlparse
20from urllib.request import urlopen, Request
21from urllib.error import HTTPError, URLError
22
23import boto3
24
25from botocore.validate import ParamValidator
26from boto3.s3.transfer import S3Transfer
27
28from c7n.credentials import SessionFactory
29from c7n.config import Bag
30from c7n.exceptions import InvalidOutputConfig, PolicyValidationError
31from c7n.log import CloudWatchLogHandler
32from c7n.utils import parse_url_config, backoff_delays
33
34from .resource_map import ResourceMap
35
36# Import output registries aws provider extends.
37from c7n.output import (
38 api_stats_outputs,
39 blob_outputs,
40 log_outputs,
41 metrics_outputs,
42 tracer_outputs
43)
44
45# Output base implementations we extend.
46from c7n.output import (
47 Metrics,
48 DeltaStats,
49 BlobOutput,
50 LogOutput,
51)
52
53from c7n.registry import PluginRegistry
54from c7n import credentials, utils
55
56log = logging.getLogger('custodian.aws')
57
58try:
59 from aws_xray_sdk.core import xray_recorder, patch
60 from aws_xray_sdk.core.context import Context
61 HAVE_XRAY = True
62except ImportError:
63 HAVE_XRAY = False
64 class Context: pass # NOQA
65
66_profile_session = None
67
68
69DEFAULT_NAMESPACE = "CloudMaid"
70# Mapping of service model shape types to json schema types
71MODEL_SCHEMA_TYPE_MAP = {
72 'string': 'string',
73 'structure': 'object',
74 'list': 'array',
75 'integer': 'integer',
76 'boolean': 'boolean',
77 'long': 'number',
78 'double': 'number',
79 'map': 'object'
80}
81
82
83def get_profile_session(options):
84 global _profile_session
85 if _profile_session:
86 return _profile_session
87
88 profile = getattr(options, 'profile', None)
89 _profile_session = boto3.Session(profile_name=profile)
90 return _profile_session
91
92
93def _default_region(options):
94 marker = object()
95 value = getattr(options, 'regions', marker)
96 if value is marker:
97 return
98
99 if len(value) > 0:
100 return
101
102 try:
103 options.regions = [get_profile_session(options).region_name]
104 except Exception:
105 log.warning('Could not determine default region')
106 options.regions = [None]
107
108 if options.regions[0] is None:
109 log.error('No default region set. Specify a default via AWS_DEFAULT_REGION '
110 'or setting a region in ~/.aws/config')
111 sys.exit(1)
112
113 log.debug("using default region:%s from boto" % options.regions[0])
114
115
116def _default_account_id(options):
117 if options.account_id:
118 return
119 elif options.assume_role:
120 try:
121 options.account_id = options.assume_role.split(':')[4]
122 return
123 except IndexError:
124 pass
125 try:
126 session = get_profile_session(options)
127 options.account_id = utils.get_account_id_from_sts(session)
128 except Exception:
129 options.account_id = None
130
131
132def _default_bucket_region(options):
133 # modify options to format s3 output urls with explicit region.
134 if not options.output_dir.startswith('s3://'):
135 return
136
137 parsed = urlparse.urlparse(options.output_dir)
138 s3_conf = parse_url_config(options.output_dir)
139 if parsed.query and s3_conf.get("region"):
140 return
141
142 # s3 clients default to us-east-1 if no region is specified, but for partition
143 # support we default to using a passed in region if given.
144 region = None
145 if options.regions:
146 region = options.regions[0]
147
148 # we're operating pre the expansion of symbolic name all into actual regions.
149 if region == "all":
150 region = None
151
152 try:
153 options.output_dir = get_bucket_url_with_region(options.output_dir, region)
154 except ValueError as err:
155 invalid_output = InvalidOutputConfig(str(err))
156 invalid_output.__suppress_context__ = True
157 raise invalid_output
158
159
160def shape_validate(params, shape_name, service):
161 session = fake_session()._session
162 model = session.get_service_model(service)
163 shape = model.shape_for(shape_name)
164 validator = ParamValidator()
165 report = validator.validate(params, shape)
166 if report.has_errors():
167 raise PolicyValidationError(report.generate_report())
168
169
170def get_bucket_url_with_region(bucket_url, region):
171 parsed = urlparse.urlparse(bucket_url)
172 s3_conf = parse_url_config(bucket_url)
173 params = {}
174 if region:
175 params['region_name'] = region
176
177 client = boto3.client('s3', **params)
178 region = inspect_bucket_region(s3_conf.netloc, client.meta.endpoint_url)
179 if not region:
180 raise ValueError(f"could not determine region for output bucket, use explicit ?region=region_name. {s3_conf.url}") # noqa
181 query = f"region={region}"
182 if parsed.query:
183 query = parsed.query + f"®ion={region}"
184 parts = parsed._replace(
185 path=parsed.path.strip("/"),
186 query=query
187 )
188 return urlparse.urlunparse(parts)
189
190
191def inspect_bucket_region(bucket, s3_endpoint, allow_public=False):
192 """Attempt to determine a bucket region without a client
193
194 We can make an unauthenticated HTTP HEAD request to S3 in an attempt to find a bucket's
195 region. This avoids some issues with cross-account/cross-region uses of the
196 GetBucketLocation or HeadBucket API action. Because bucket names are unique within
197 AWS partitions, we can make requests to a single regional S3 endpoint
198 and get redirected if a bucket lives in another region within the
199 same partition.
200
201 This approach is inspired by some sample code from a Go SDK issue comment,
202 which @sean-zou mentioned in #7593:
203
204 https://github.com/aws/aws-sdk-go/issues/720#issuecomment-613038544
205
206 Return a region string, or None if we're unable to determine one.
207 """
208 region = None
209 s3_endpoint_parts = urlparse.urlparse(s3_endpoint)
210 # Use a "path-style" S3 URL here to avoid failing TLS certificate validation
211 # on buckets with a dot in the name.
212 #
213 # According to the following blog post, before deprecating path-style
214 # URLs AWS will provide a way for virtual-hosted-style URLs to handle
215 # buckets with dots in their names. Using path-style URLs here in
216 # the meantime seems reasonable, compared to alternatives like forcing
217 # HTTP or ignoring certificate validation.
218 #
219 # https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/
220 bucket_endpoint = f'https://{s3_endpoint_parts.netloc}/{bucket}'
221 request = Request(bucket_endpoint, method='HEAD')
222 try:
223 # For private buckets the head request will always raise an
224 # http error, the status code and response headers provide
225 # context for where the bucket is. For public buckets we
226 # default to raising an exception as unsuitable location at
227 # least for the output use case.
228 #
229 # Dynamic use of urllib trips up static analyzers because of
230 # the potential to accidentally allow unexpected schemes like
231 # file:/. Here we're hardcoding the https scheme, so we can
232 # ignore those specific checks.
233 #
234 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa
235 response = url_socket_retry(urlopen, request) # nosec B310
236 # Successful response indicates a public accessible bucket in the same region
237 region = response.headers.get('x-amz-bucket-region')
238
239 if not allow_public:
240 raise ValueError("bucket: '{bucket}' is publicly accessible")
241 except HTTPError as err:
242 # Returns 404 'Not Found' for buckets that don't exist
243 if err.status == 404:
244 raise ValueError(f"bucket '{bucket}' does not exist")
245 # Permission errors (403) or redirects (301) for valid buckets
246 # should still contain a header we can use to determine the
247 # bucket region. Permission errors are indicative of correct
248 # region, while redirects are for cross region.
249 region = err.headers.get('x-amz-bucket-region')
250
251 return region
252
253
254def url_socket_retry(func, *args, **kw):
255 """retry a urllib operation in the event of certain errors.
256
257 we want to retry on some common issues for cases where we are
258 connecting through an intermediary proxy or where the downstream
259 is overloaded.
260
261 socket errors
262 - 104 - Connection reset by peer
263 - 110 - Connection timed out
264
265 http errors
266 - 503 - Slow Down | Service Unavailable
267 """
268 min_delay = 1
269 max_delay = 32
270 max_attempts = 4
271
272 for idx, delay in enumerate(
273 backoff_delays(min_delay, max_delay, jitter=True)):
274 try:
275 return func(*args, **kw)
276 except HTTPError as err:
277 if not (err.status == 503 and 'Slow Down' in err.reason):
278 raise
279 if idx == max_attempts - 1:
280 raise
281 except URLError as err:
282 if not isinstance(err.reason, socket.error):
283 raise
284 if err.reason.errno not in (104, 110):
285 raise
286 if idx == max_attempts - 1:
287 raise
288
289 time.sleep(delay)
290
291
292class Arn(namedtuple('_Arn', (
293 'arn', 'partition', 'service', 'region',
294 'account_id', 'resource', 'resource_type', 'separator'))):
295
296 __slots__ = ()
297
298 def __repr__(self):
299 return "<arn:%s:%s:%s:%s:%s%s%s>" % (
300 self.partition,
301 self.service,
302 self.region,
303 self.account_id,
304 self.resource_type,
305 self.separator,
306 self.resource)
307
308 @classmethod
309 def parse(cls, arn):
310 if isinstance(arn, Arn):
311 return arn
312 parts = arn.split(':', 5)
313 if len(parts) < 3:
314 raise ValueError("Invalid Arn")
315 # a few resources use qualifiers without specifying type
316 if parts[2] in ('s3', 'apigateway', 'execute-api', 'emr-serverless'):
317 parts.append(None)
318 parts.append(None)
319 elif '/' in parts[-1]:
320 parts.extend(reversed(parts.pop(-1).split('/', 1)))
321 parts.append('/')
322 elif ':' in parts[-1]:
323 parts.extend(reversed(parts.pop(-1).split(':', 1)))
324 parts.append(':')
325 elif len(parts) == 6:
326 parts.append('')
327 parts.append('')
328 # replace the literal 'arn' string with raw arn
329 parts[0] = arn
330 return cls(*parts)
331
332
333class ArnResolver:
334
335 def __init__(self, manager):
336 self.manager = manager
337
338 def resolve(self, arns):
339 arns = map(Arn.parse, arns)
340 a_service = operator.attrgetter('service')
341 a_resource = operator.attrgetter('resource_type')
342 kfunc = lambda a: (a_service(a), a_resource(a)) # noqa
343 arns = sorted(arns, key=kfunc)
344 results = {}
345 for (service, arn_type), arn_set in itertools.groupby(arns, key=kfunc):
346 arn_set = list(arn_set)
347 rtype = ArnResolver.resolve_type(arn_set[0])
348 rmanager = self.manager.get_resource_manager(rtype)
349 if rtype == 'sns':
350 resources = rmanager.get_resources(
351 [rarn.arn for rarn in arn_set])
352 else:
353 resources = rmanager.get_resources(
354 [rarn.resource for rarn in arn_set])
355 for rarn, r in zip(rmanager.get_arns(resources), resources):
356 results[rarn] = r
357
358 for rarn in arn_set:
359 if rarn.arn not in results:
360 results[rarn.arn] = None
361 return results
362
363 @staticmethod
364 def resolve_type(arn):
365 arn = Arn.parse(arn)
366
367 # this would benefit from a class cache {service} -> rtypes
368 for type_name, klass in AWS.resources.items():
369 if type_name in ('rest-account', 'account') or klass.resource_type.arn is False:
370 continue
371 if arn.service != (klass.resource_type.arn_service or klass.resource_type.service):
372 continue
373 if (type_name in ('asg', 'ecs-task') and
374 "%s%s" % (klass.resource_type.arn_type, klass.resource_type.arn_separator)
375 in arn.resource_type):
376 return type_name
377 elif (klass.resource_type.arn_type is not None and
378 klass.resource_type.arn_type == arn.resource_type):
379 return type_name
380 elif (klass.resource_type.arn_service == arn.service and
381 klass.resource_type.arn_type == ""):
382 return type_name
383
384
385@metrics_outputs.register('aws')
386class MetricsOutput(Metrics):
387 """Send metrics data to cloudwatch
388 """
389
390 permissions = ("cloudWatch:PutMetricData",)
391 retry = staticmethod(utils.get_retry(('Throttling',)))
392
393 def __init__(self, ctx, config=None):
394 super(MetricsOutput, self).__init__(ctx, config)
395 self.namespace = self.config.get('namespace', DEFAULT_NAMESPACE)
396 self.region = self.config.get('region')
397 self.ignore_zero = self.config.get('ignore_zero')
398 am = self.config.get('active_metrics')
399 self.active_metrics = am and am.split(',')
400 self.destination = (
401 self.config.scheme == 'aws' and
402 self.config.get('netloc') == 'master') and 'master' or None
403
404 def _format_metric(self, key, value, unit, dimensions):
405 d = {
406 "MetricName": key,
407 "Timestamp": datetime.datetime.utcnow(),
408 "Value": value,
409 "Unit": unit}
410 d["Dimensions"] = [
411 {"Name": "Policy", "Value": self.ctx.policy.name},
412 {"Name": "ResType", "Value": self.ctx.policy.resource_type}]
413 for k, v in dimensions.items():
414 # Skip legacy static dimensions if using new capabilities
415 if (self.destination or self.region) and k == 'Scope':
416 continue
417 d['Dimensions'].append({"Name": k, "Value": v})
418 if self.region:
419 d['Dimensions'].append(
420 {'Name': 'Region', 'Value': self.ctx.options.region})
421 if self.destination:
422 d['Dimensions'].append(
423 {'Name': 'Account', 'Value': self.ctx.options.account_id or ''})
424 return d
425
426 def _put_metrics(self, ns, metrics):
427 if self.destination == 'master':
428 watch = self.ctx.session_factory(
429 assume=False).client('cloudwatch', region_name=self.region)
430 else:
431 watch = utils.local_session(
432 self.ctx.session_factory).client('cloudwatch', region_name=self.region)
433
434 # NOTE filter out value is 0 metrics data
435 if self.ignore_zero in ['1', 'true', 'True']:
436 metrics = [m for m in metrics if m.get("Value") != 0]
437 # NOTE filter metrics data by the metric name configured
438 if self.active_metrics:
439 metrics = [m for m in metrics if m["MetricName"] in self.active_metrics]
440 if not metrics:
441 return
442 return self.retry(
443 watch.put_metric_data, Namespace=ns, MetricData=metrics)
444
445
446@log_outputs.register('aws')
447class CloudWatchLogOutput(LogOutput):
448
449 log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
450
451 def __init__(self, ctx, config=None):
452 super(CloudWatchLogOutput, self).__init__(ctx, config)
453 if self.config['netloc'] == 'master' or not self.config['netloc']:
454 self.log_group = self.config['path'].strip('/')
455 else:
456 # join netloc to path for casual usages of aws://log/group/name
457 self.log_group = ("%s/%s" % (
458 self.config['netloc'], self.config['path'].strip('/'))).strip('/')
459 self.region = self.config.get('region', ctx.options.region)
460 self.destination = (
461 self.config.scheme == 'aws' and
462 self.config.get('netloc') == 'master') and 'master' or None
463
464 def construct_stream_name(self):
465 if self.config.get('stream') is None:
466 log_stream = self.ctx.policy.name
467 if self.config.get('region') is not None:
468 log_stream = "{}/{}".format(self.ctx.options.region, log_stream)
469 if self.config.get('netloc') == 'master':
470 log_stream = "{}/{}".format(self.ctx.options.account_id, log_stream)
471 else:
472 log_stream = self.config.get('stream').format(
473 region=self.ctx.options.region,
474 account=self.ctx.options.account_id,
475 policy=self.ctx.policy.name,
476 now=datetime.datetime.utcnow())
477 return log_stream
478
479 def get_handler(self):
480 log_stream = self.construct_stream_name()
481 params = dict(
482 log_group=self.log_group, log_stream=log_stream,
483 session_factory=(
484 lambda x=None: self.ctx.session_factory(
485 region=self.region, assume=self.destination != 'master')))
486 return CloudWatchLogHandler(**params)
487
488 def __repr__(self):
489 return "<%s to group:%s stream:%s>" % (
490 self.__class__.__name__,
491 self.ctx.options.log_group,
492 self.ctx.policy.name)
493
494
495class XrayEmitter:
496 # implement https://github.com/aws/aws-xray-sdk-python/issues/51
497
498 def __init__(self):
499 self.buf = []
500 self.client = None
501
502 def send_entity(self, entity):
503 self.buf.append(entity)
504 if len(self.buf) > 49:
505 self.flush()
506
507 def flush(self):
508 buf = self.buf
509 self.buf = []
510 for segment_set in utils.chunks(buf, 50):
511 self.client.put_trace_segments(
512 TraceSegmentDocuments=[s.serialize() for s in segment_set])
513
514
515class XrayContext(Context):
516 """Specialized XRay Context for Custodian.
517
518 A context is used as a segment storage stack for currently in
519 progress segments.
520
521 We use a customized context for custodian as policy execution
522 commonly uses a concurrent.futures threadpool pattern during
523 execution for api concurrency. Default xray semantics would use
524 thread local storage and treat each of those as separate trace
525 executions. We want to aggregate/associate all thread pool api
526 executions to the custoidan policy execution. XRay sdk supports
527 this via manual code for every thread pool usage, but we don't
528 want to explicitly couple xray integration everywhere across the
529 codebase. Instead we use a context that is aware of custodian
530 usage of threads and associates subsegments therein to the policy
531 execution active subsegment.
532 """
533
534 def __init__(self, *args, **kw):
535 super(XrayContext, self).__init__(*args, **kw)
536 self._local = Bag()
537 self._current_subsegment = None
538 self._main_tid = threading.get_ident()
539
540 def handle_context_missing(self):
541 """Custodian has a few api calls out of band of policy execution.
542
543 - Resolving account alias.
544 - Cloudwatch Log group/stream discovery/creation (when using -l on cli)
545
546 Also we want to folks to optionally based on configuration using xray
547 so default to disabling context missing output.
548 """
549
550 # Annotate any segments/subsegments with their thread ids.
551 def put_segment(self, segment):
552 if getattr(segment, 'thread_id', None) is None:
553 segment.thread_id = threading.get_ident()
554 super().put_segment(segment)
555
556 def put_subsegment(self, subsegment):
557 if getattr(subsegment, 'thread_id', None) is None:
558 subsegment.thread_id = threading.get_ident()
559 super().put_subsegment(subsegment)
560
561 # Override since we're not just popping the end of the stack, we're removing
562 # the thread subsegment from the array by identity.
563 def end_subsegment(self, end_time):
564 subsegment = self.get_trace_entity()
565 if self._is_subsegment(subsegment):
566 subsegment.close(end_time)
567 self._local.entities.remove(subsegment)
568 return True
569 else:
570 log.warning("No subsegment to end.")
571 return False
572
573 # Override get trace identity, any worker thread will find its own subsegment
574 # on the stack, else will use the main thread's sub/segment
575 def get_trace_entity(self):
576 tid = threading.get_ident()
577 entities = self._local.get('entities', ())
578 for s in reversed(entities):
579 if s.thread_id == tid:
580 return s
581 # custodian main thread won't advance (create new segment)
582 # with worker threads still doing pool work.
583 elif s.thread_id == self._main_tid:
584 return s
585 return self.handle_context_missing()
586
587
588@tracer_outputs.register('xray', condition=HAVE_XRAY)
589class XrayTracer:
590
591 emitter = XrayEmitter()
592
593 in_lambda = 'LAMBDA_TASK_ROOT' in os.environ
594 use_daemon = 'AWS_XRAY_DAEMON_ADDRESS' in os.environ
595 service_name = 'custodian'
596
597 @classmethod
598 def initialize(cls, config):
599 context = XrayContext()
600 sampling = config.get('sample', 'true') == 'true' and True or False
601 xray_recorder.configure(
602 emitter=cls.use_daemon is False and cls.emitter or None,
603 context=context,
604 sampling=sampling,
605 context_missing='LOG_ERROR')
606 patch(['boto3', 'requests'])
607 logging.getLogger('aws_xray_sdk.core').setLevel(logging.ERROR)
608
609 def __init__(self, ctx, config):
610 self.ctx = ctx
611 self.config = config or {}
612 self.client = None
613 self.metadata = {}
614
615 @contextlib.contextmanager
616 def subsegment(self, name):
617 segment = xray_recorder.begin_subsegment(name)
618 try:
619 yield segment
620 except Exception as e:
621 stack = traceback.extract_stack(limit=xray_recorder.max_trace_back)
622 segment.add_exception(e, stack)
623 raise
624 finally:
625 xray_recorder.end_subsegment(time.time())
626
627 def __enter__(self):
628 if self.client is None:
629 self.client = self.ctx.session_factory(assume=False).client('xray')
630
631 self.emitter.client = self.client
632
633 if self.in_lambda:
634 self.segment = xray_recorder.begin_subsegment(self.service_name)
635 else:
636 self.segment = xray_recorder.begin_segment(
637 self.service_name, sampling=True)
638
639 p = self.ctx.policy
640 xray_recorder.put_annotation('policy', p.name)
641 xray_recorder.put_annotation('resource', p.resource_type)
642 if self.ctx.options.account_id:
643 xray_recorder.put_annotation('account', self.ctx.options.account_id)
644
645 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
646 metadata = self.ctx.get_metadata(('api-stats',))
647 metadata.update(self.metadata)
648 xray_recorder.put_metadata('custodian', metadata)
649 if self.in_lambda:
650 xray_recorder.end_subsegment()
651 return
652 xray_recorder.end_segment()
653 if not self.use_daemon:
654 self.emitter.flush()
655 log.info(
656 ('View XRay Trace https://console.aws.amazon.com/xray/home?region=%s#/'
657 'traces/%s' % (self.ctx.options.region, self.segment.trace_id)))
658 self.metadata.clear()
659
660
661@api_stats_outputs.register('aws')
662class ApiStats(DeltaStats):
663
664 def __init__(self, ctx, config=None):
665 super(ApiStats, self).__init__(ctx, config)
666 self.api_calls = Counter()
667
668 def get_snapshot(self):
669 return dict(self.api_calls)
670
671 def get_metadata(self):
672 return self.get_snapshot()
673
674 def __enter__(self):
675 if isinstance(self.ctx.session_factory, credentials.SessionFactory):
676 self.ctx.session_factory.set_subscribers((self,))
677 self.push_snapshot()
678
679 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
680 if isinstance(self.ctx.session_factory, credentials.SessionFactory):
681 self.ctx.session_factory.set_subscribers(())
682
683 # With cached sessions, we need to unregister any events subscribers
684 # on extant sessions to allow for the next registration.
685 utils.local_session(self.ctx.session_factory).events.unregister(
686 'after-call.*.*', self._record, unique_id='c7n-api-stats')
687
688 self.ctx.metrics.put_metric(
689 "ApiCalls", sum(self.api_calls.values()), "Count")
690 self.pop_snapshot()
691
692 def __call__(self, s):
693 s.events.register(
694 'after-call.*.*', self._record, unique_id='c7n-api-stats')
695
696 def _record(self, http_response, parsed, model, **kwargs):
697 self.api_calls["%s.%s" % (
698 model.service_model.endpoint_prefix, model.name)] += 1
699
700
701@blob_outputs.register('s3')
702class S3Output(BlobOutput):
703 """
704 Usage:
705
706 .. code-block:: python
707
708 with S3Output(session_factory, 's3://bucket/prefix'):
709 log.info('xyz') # -> log messages sent to custodian-run.log.gz
710
711 """
712
713 permissions = ('S3:PutObject',)
714
715 def __init__(self, ctx, config):
716 super().__init__(ctx, config)
717 self._transfer = None
718
719 @property
720 def transfer(self):
721 if self._transfer:
722 return self._transfer
723 bucket_region = self.config.region or None
724 self._transfer = S3Transfer(
725 self.ctx.session_factory(region=bucket_region, assume=False).client('s3'))
726 return self._transfer
727
728 def upload_file(self, path, key):
729 self.transfer.upload_file(
730 path, self.bucket, key,
731 extra_args={
732 'ACL': 'bucket-owner-full-control',
733 'ServerSideEncryption': 'AES256'})
734
735
736@clouds.register('aws')
737class AWS(Provider):
738
739 display_name = 'AWS'
740 resource_prefix = 'aws'
741 # legacy path for older plugins
742 resources = PluginRegistry('resources')
743 # import paths for resources
744 resource_map = ResourceMap
745
746 def initialize(self, options):
747 """
748 """
749 _default_region(options)
750 _default_account_id(options)
751 _default_bucket_region(options)
752
753 if options.tracer and options.tracer.startswith('xray') and HAVE_XRAY:
754 XrayTracer.initialize(utils.parse_url_config(options.tracer))
755 return options
756
757 def get_session_factory(self, options):
758 return SessionFactory(
759 options.region,
760 options.profile,
761 options.assume_role,
762 options.external_id,
763 options.session_policy)
764
765 def initialize_policies(self, policy_collection, options):
766 """Return a set of policies targetted to the given regions.
767
768 Supports symbolic regions like 'all'. This will automatically
769 filter out policies if they are being targetted to a region that
770 does not support the service. Global services will target a
771 single region (us-east-1 if only all specified, else first
772 region in the list).
773
774 Note for region partitions (govcloud and china) an explicit
775 region from the partition must be passed in.
776 """
777 from c7n.policy import Policy, PolicyCollection
778 policies = []
779 service_region_map, resource_service_map = get_service_region_map(
780 options.regions, policy_collection.resource_types, self.type)
781 if 'all' in options.regions:
782 enabled_regions = {
783 r['RegionName'] for r in
784 get_profile_session(options).client('ec2').describe_regions(
785 Filters=[{'Name': 'opt-in-status',
786 'Values': ['opt-in-not-required', 'opted-in']}]
787 ).get('Regions')}
788 for p in policy_collection:
789 if 'aws.' in p.resource_type:
790 _, resource_type = p.resource_type.split('.', 1)
791 else:
792 resource_type = p.resource_type
793 available_regions = service_region_map.get(
794 resource_service_map.get(resource_type), ())
795
796 # its a global service/endpoint, use user provided region
797 # or us-east-1.
798 if not available_regions and options.regions:
799 candidates = [r for r in options.regions if r != 'all']
800 candidate = candidates and candidates[0] or 'us-east-1'
801 svc_regions = [candidate]
802 elif 'all' in options.regions:
803 svc_regions = list(set(available_regions).intersection(enabled_regions))
804 else:
805 svc_regions = options.regions
806
807 for region in svc_regions:
808 if available_regions and region not in available_regions:
809 level = ('all' in options.regions and
810 logging.DEBUG or logging.WARNING)
811 # TODO: fixme
812 policy_collection.log.log(
813 level, "policy:%s resources:%s not available in region:%s",
814 p.name, p.resource_type, region)
815 continue
816 options_copy = copy.copy(options)
817 options_copy.region = str(region)
818
819 if len(options.regions) > 1 or 'all' in options.regions and getattr(
820 options, 'output_dir', None):
821 options_copy.output_dir = join_output(options.output_dir, region)
822 policies.append(
823 Policy(p.data, options_copy,
824 session_factory=policy_collection.session_factory()))
825
826 return PolicyCollection(
827 # order policies by region to minimize local session invalidation.
828 # note relative ordering of policies must be preserved, python sort
829 # is stable.
830 sorted(policies, key=operator.attrgetter('options.region')),
831 options)
832
833
834def join_output(output_dir, suffix):
835 if '{region}' in output_dir:
836 return output_dir.rstrip('/')
837 if output_dir.endswith('://'):
838 return output_dir + suffix
839 output_url_parts = urlparse.urlparse(output_dir)
840 # for output urls, the end of the url may be a
841 # query string. make sure we add a suffix to
842 # the path component.
843 output_url_parts = output_url_parts._replace(
844 path=output_url_parts.path.rstrip('/') + '/%s' % suffix
845 )
846 return urlparse.urlunparse(output_url_parts)
847
848
849def fake_session():
850 session = boto3.Session( # nosec nosemgrep
851 region_name='us-east-1',
852 aws_access_key_id='never',
853 aws_secret_access_key='found')
854 return session
855
856
857def get_service_region_map(regions, resource_types, provider='aws'):
858 # we're not interacting with the apis just using the sdk meta information.
859
860 session = fake_session()
861 normalized_types = []
862 for r in resource_types:
863 if r.startswith('%s.' % provider):
864 normalized_types.append(r[len(provider) + 1:])
865 else:
866 normalized_types.append(r)
867 resource_service_map = {
868 r: clouds[provider].resources.get(r).resource_type.service
869 for r in normalized_types if r != 'account'}
870 # support for govcloud, china, and iso. We only utilize these regions if they
871 # are explicitly passed in on the cli.
872 partition_regions = {}
873 for p in ('aws-cn', 'aws-us-gov', 'aws-iso'):
874 for r in session.get_available_regions('s3', partition_name=p):
875 partition_regions[r] = p
876
877 partitions = ['aws']
878 for r in regions:
879 if r in partition_regions:
880 partitions.append(partition_regions[r])
881
882 service_region_map = {}
883 for s in set(itertools.chain(resource_service_map.values())):
884 for partition in partitions:
885 service_region_map.setdefault(s, []).extend(
886 session.get_available_regions(s, partition_name=partition))
887 return service_region_map, resource_service_map
888
889
890def shape_schema(service, shape_name, drop_fields=()):
891 """Expand a shape's schema using service model shape data
892
893 Repurpose some of the shape discovery/validation logic in
894 c7n.resources.aws.shape_validate() to dynamically expand
895 element schema using the latest service model shape information.
896
897 Include available properties, their types, and enumerations of
898 possible values where available.
899
900 Args:
901 service (str): The AWS service for the element. (required)
902 shape_name (str): The service model request shape name. (required)
903 drop_fields (Tuple[str]): List of fields to drop from the schema
904 (e.g. resource_id param).
905 """
906
907 def _expand_shape_schema(shape):
908 schema = {}
909 for member, member_shape in shape.members.items():
910 if member in drop_fields:
911 continue
912 _type = MODEL_SCHEMA_TYPE_MAP.get(member_shape.type_name)
913 if _type is None:
914 raise KeyError(f"Unknown type for {member_shape.name}: {member_shape.type_name}")
915 member_schema = {'type': _type}
916 if enum := getattr(member_shape, 'enum', None):
917 member_schema['enum'] = enum
918 if member_shape.type_name == 'structure':
919 member_schema["properties"] = _expand_shape_schema(member_shape)
920 elif member_shape.type_name == 'list':
921 if member_shape.member.type_name == 'structure':
922 member_schema["items"] = {
923 'type': 'object',
924 'properties': _expand_shape_schema(member_shape.member)
925 }
926 else:
927 member_schema["items"] = {
928 'type': MODEL_SCHEMA_TYPE_MAP.get(member_shape.member.type_name)
929 }
930 elif member_shape.type_name == 'map':
931 if member_shape.value.type_name in ('structure', 'list'):
932 member_schema["patternProperties"] = {
933 "^.+$": _expand_shape_schema(member_shape.value)
934 }
935 else:
936 member_schema["patternProperties"] = {
937 "^.+$": {
938 'type': MODEL_SCHEMA_TYPE_MAP.get(member_shape.value.type_name)
939 }
940 }
941
942 schema[member] = member_schema
943 return schema
944
945 session = fake_session()._session
946 model = session.get_service_model(service)
947 shape = model.shape_for(shape_name)
948
949 return _expand_shape_schema(shape)