Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/resources/aws.py: 31%
485 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
4from c7n.provider import clouds, Provider
6from collections import Counter, namedtuple
7import contextlib
8import copy
9import datetime
10import itertools
11import logging
12import os
13import operator
14import socket
15import sys
16import time
17import threading
18import traceback
19from urllib import parse as urlparse
20from urllib.request import urlopen, Request
21from urllib.error import HTTPError, URLError
23import boto3
25from botocore.validate import ParamValidator
26from boto3.s3.transfer import S3Transfer
28from c7n.credentials import SessionFactory
29from c7n.config import Bag
30from c7n.exceptions import InvalidOutputConfig, PolicyValidationError
31from c7n.log import CloudWatchLogHandler
32from c7n.utils import parse_url_config, backoff_delays
34from .resource_map import ResourceMap
36# Import output registries aws provider extends.
37from c7n.output import (
38 api_stats_outputs,
39 blob_outputs,
40 log_outputs,
41 metrics_outputs,
42 tracer_outputs
43)
45# Output base implementations we extend.
46from c7n.output import (
47 Metrics,
48 DeltaStats,
49 BlobOutput,
50 LogOutput,
51)
53from c7n.registry import PluginRegistry
54from c7n import credentials, utils
56log = logging.getLogger('custodian.aws')
58try:
59 from aws_xray_sdk.core import xray_recorder, patch
60 from aws_xray_sdk.core.context import Context
61 HAVE_XRAY = True
62except ImportError:
63 HAVE_XRAY = False
64 class Context: pass # NOQA
66_profile_session = None
69DEFAULT_NAMESPACE = "CloudMaid"
72def get_profile_session(options):
73 global _profile_session
74 if _profile_session:
75 return _profile_session
77 profile = getattr(options, 'profile', None)
78 _profile_session = boto3.Session(profile_name=profile)
79 return _profile_session
82def _default_region(options):
83 marker = object()
84 value = getattr(options, 'regions', marker)
85 if value is marker:
86 return
88 if len(value) > 0:
89 return
91 try:
92 options.regions = [get_profile_session(options).region_name]
93 except Exception:
94 log.warning('Could not determine default region')
95 options.regions = [None]
97 if options.regions[0] is None:
98 log.error('No default region set. Specify a default via AWS_DEFAULT_REGION '
99 'or setting a region in ~/.aws/config')
100 sys.exit(1)
102 log.debug("using default region:%s from boto" % options.regions[0])
105def _default_account_id(options):
106 if options.account_id:
107 return
108 elif options.assume_role:
109 try:
110 options.account_id = options.assume_role.split(':')[4]
111 return
112 except IndexError:
113 pass
114 try:
115 session = get_profile_session(options)
116 options.account_id = utils.get_account_id_from_sts(session)
117 except Exception:
118 options.account_id = None
121def _default_bucket_region(options):
122 # modify options to format s3 output urls with explicit region.
123 if not options.output_dir.startswith('s3://'):
124 return
126 parsed = urlparse.urlparse(options.output_dir)
127 s3_conf = parse_url_config(options.output_dir)
128 if parsed.query and s3_conf.get("region"):
129 return
131 # s3 clients default to us-east-1 if no region is specified, but for partition
132 # support we default to using a passed in region if given.
133 region = None
134 if options.regions:
135 region = options.regions[0]
137 # we're operating pre the expansion of symbolic name all into actual regions.
138 if region == "all":
139 region = None
141 try:
142 options.output_dir = get_bucket_url_with_region(options.output_dir, region)
143 except ValueError as err:
144 invalid_output = InvalidOutputConfig(str(err))
145 invalid_output.__suppress_context__ = True
146 raise invalid_output
149def shape_validate(params, shape_name, service):
150 session = fake_session()._session
151 model = session.get_service_model(service)
152 shape = model.shape_for(shape_name)
153 validator = ParamValidator()
154 report = validator.validate(params, shape)
155 if report.has_errors():
156 raise PolicyValidationError(report.generate_report())
159def get_bucket_url_with_region(bucket_url, region):
160 parsed = urlparse.urlparse(bucket_url)
161 s3_conf = parse_url_config(bucket_url)
162 params = {}
163 if region:
164 params['region_name'] = region
166 client = boto3.client('s3', **params)
167 region = inspect_bucket_region(s3_conf.netloc, client.meta.endpoint_url)
168 if not region:
169 raise ValueError(f"could not determine region for output bucket, use explicit ?region=region_name. {s3_conf.url}") # noqa
170 query = f"region={region}"
171 if parsed.query:
172 query = parsed.query + f"®ion={region}"
173 parts = parsed._replace(
174 path=parsed.path.strip("/"),
175 query=query
176 )
177 return urlparse.urlunparse(parts)
180def inspect_bucket_region(bucket, s3_endpoint, allow_public=False):
181 """Attempt to determine a bucket region without a client
183 We can make an unauthenticated HTTP HEAD request to S3 in an attempt to find a bucket's
184 region. This avoids some issues with cross-account/cross-region uses of the
185 GetBucketLocation or HeadBucket API action. Because bucket names are unique within
186 AWS partitions, we can make requests to a single regional S3 endpoint
187 and get redirected if a bucket lives in another region within the
188 same partition.
190 This approach is inspired by some sample code from a Go SDK issue comment,
191 which @sean-zou mentioned in #7593:
193 https://github.com/aws/aws-sdk-go/issues/720#issuecomment-613038544
195 Return a region string, or None if we're unable to determine one.
196 """
197 region = None
198 s3_endpoint_parts = urlparse.urlparse(s3_endpoint)
199 # Use a "path-style" S3 URL here to avoid failing TLS certificate validation
200 # on buckets with a dot in the name.
201 #
202 # According to the following blog post, before deprecating path-style
203 # URLs AWS will provide a way for virtual-hosted-style URLs to handle
204 # buckets with dots in their names. Using path-style URLs here in
205 # the meantime seems reasonable, compared to alternatives like forcing
206 # HTTP or ignoring certificate validation.
207 #
208 # https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/
209 bucket_endpoint = f'https://{s3_endpoint_parts.netloc}/{bucket}'
210 request = Request(bucket_endpoint, method='HEAD')
211 try:
212 # For private buckets the head request will always raise an
213 # http error, the status code and response headers provide
214 # context for where the bucket is. For public buckets we
215 # default to raising an exception as unsuitable location at
216 # least for the output use case.
217 #
218 # Dynamic use of urllib trips up static analyzers because of
219 # the potential to accidentally allow unexpected schemes like
220 # file:/. Here we're hardcoding the https scheme, so we can
221 # ignore those specific checks.
222 #
223 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa
224 response = url_socket_retry(urlopen, request) # nosec B310
225 # Successful response indicates a public accessible bucket in the same region
226 region = response.headers.get('x-amz-bucket-region')
228 if not allow_public:
229 raise ValueError("bucket: '{bucket}' is publicly accessible")
230 except HTTPError as err:
231 # Returns 404 'Not Found' for buckets that don't exist
232 if err.status == 404:
233 raise ValueError(f"bucket '{bucket}' does not exist")
234 # Permission errors (403) or redirects (301) for valid buckets
235 # should still contain a header we can use to determine the
236 # bucket region. Permission errors are indicative of correct
237 # region, while redirects are for cross region.
238 region = err.headers.get('x-amz-bucket-region')
240 return region
243def url_socket_retry(func, *args, **kw):
244 """retry a urllib operation in the event of certain errors.
246 we want to retry on some common issues for cases where we are
247 connecting through an intermediary proxy or where the downstream
248 is overloaded.
250 socket errors
251 - 104 - Connection reset by peer
252 - 110 - Connection timed out
254 http errors
255 - 503 - Slow Down | Service Unavailable
256 """
257 min_delay = 1
258 max_delay = 32
259 max_attempts = 4
261 for idx, delay in enumerate(
262 backoff_delays(min_delay, max_delay, jitter=True)):
263 try:
264 return func(*args, **kw)
265 except HTTPError as err:
266 if not (err.status == 503 and 'Slow Down' in err.reason):
267 raise
268 if idx == max_attempts - 1:
269 raise
270 except URLError as err:
271 if not isinstance(err.reason, socket.error):
272 raise
273 if err.reason.errno not in (104, 110):
274 raise
275 if idx == max_attempts - 1:
276 raise
278 time.sleep(delay)
281class Arn(namedtuple('_Arn', (
282 'arn', 'partition', 'service', 'region',
283 'account_id', 'resource', 'resource_type', 'separator'))):
285 __slots__ = ()
287 def __repr__(self):
288 return "<arn:%s:%s:%s:%s:%s%s%s>" % (
289 self.partition,
290 self.service,
291 self.region,
292 self.account_id,
293 self.resource_type,
294 self.separator,
295 self.resource)
297 @classmethod
298 def parse(cls, arn):
299 if isinstance(arn, Arn):
300 return arn
301 parts = arn.split(':', 5)
302 if len(parts) < 3:
303 raise ValueError("Invalid Arn")
304 # a few resources use qualifiers without specifying type
305 if parts[2] in ('s3', 'apigateway', 'execute-api', 'emr-serverless'):
306 parts.append(None)
307 parts.append(None)
308 elif '/' in parts[-1]:
309 parts.extend(reversed(parts.pop(-1).split('/', 1)))
310 parts.append('/')
311 elif ':' in parts[-1]:
312 parts.extend(reversed(parts.pop(-1).split(':', 1)))
313 parts.append(':')
314 elif len(parts) == 6:
315 parts.append('')
316 parts.append('')
317 # replace the literal 'arn' string with raw arn
318 parts[0] = arn
319 return cls(*parts)
322class ArnResolver:
324 def __init__(self, manager):
325 self.manager = manager
327 def resolve(self, arns):
328 arns = map(Arn.parse, arns)
329 a_service = operator.attrgetter('service')
330 a_resource = operator.attrgetter('resource_type')
331 kfunc = lambda a: (a_service(a), a_resource(a)) # noqa
332 arns = sorted(arns, key=kfunc)
333 results = {}
334 for (service, arn_type), arn_set in itertools.groupby(arns, key=kfunc):
335 arn_set = list(arn_set)
336 rtype = ArnResolver.resolve_type(arn_set[0])
337 rmanager = self.manager.get_resource_manager(rtype)
338 if rtype == 'sns':
339 resources = rmanager.get_resources(
340 [rarn.arn for rarn in arn_set])
341 else:
342 resources = rmanager.get_resources(
343 [rarn.resource for rarn in arn_set])
344 for rarn, r in zip(rmanager.get_arns(resources), resources):
345 results[rarn] = r
347 for rarn in arn_set:
348 if rarn.arn not in results:
349 results[rarn.arn] = None
350 return results
352 @staticmethod
353 def resolve_type(arn):
354 arn = Arn.parse(arn)
356 # this would benefit from a class cache {service} -> rtypes
357 for type_name, klass in AWS.resources.items():
358 if type_name in ('rest-account', 'account') or klass.resource_type.arn is False:
359 continue
360 if arn.service != (klass.resource_type.arn_service or klass.resource_type.service):
361 continue
362 if (type_name in ('asg', 'ecs-task') and
363 "%s%s" % (klass.resource_type.arn_type, klass.resource_type.arn_separator)
364 in arn.resource_type):
365 return type_name
366 elif (klass.resource_type.arn_type is not None and
367 klass.resource_type.arn_type == arn.resource_type):
368 return type_name
369 elif (klass.resource_type.arn_service == arn.service and
370 klass.resource_type.arn_type == ""):
371 return type_name
374@metrics_outputs.register('aws')
375class MetricsOutput(Metrics):
376 """Send metrics data to cloudwatch
377 """
379 permissions = ("cloudWatch:PutMetricData",)
380 retry = staticmethod(utils.get_retry(('Throttling',)))
382 def __init__(self, ctx, config=None):
383 super(MetricsOutput, self).__init__(ctx, config)
384 self.namespace = self.config.get('namespace', DEFAULT_NAMESPACE)
385 self.region = self.config.get('region')
386 self.ignore_zero = self.config.get('ignore_zero')
387 am = self.config.get('active_metrics')
388 self.active_metrics = am and am.split(',')
389 self.destination = (
390 self.config.scheme == 'aws' and
391 self.config.get('netloc') == 'master') and 'master' or None
393 def _format_metric(self, key, value, unit, dimensions):
394 d = {
395 "MetricName": key,
396 "Timestamp": datetime.datetime.utcnow(),
397 "Value": value,
398 "Unit": unit}
399 d["Dimensions"] = [
400 {"Name": "Policy", "Value": self.ctx.policy.name},
401 {"Name": "ResType", "Value": self.ctx.policy.resource_type}]
402 for k, v in dimensions.items():
403 # Skip legacy static dimensions if using new capabilities
404 if (self.destination or self.region) and k == 'Scope':
405 continue
406 d['Dimensions'].append({"Name": k, "Value": v})
407 if self.region:
408 d['Dimensions'].append(
409 {'Name': 'Region', 'Value': self.ctx.options.region})
410 if self.destination:
411 d['Dimensions'].append(
412 {'Name': 'Account', 'Value': self.ctx.options.account_id or ''})
413 return d
415 def _put_metrics(self, ns, metrics):
416 if self.destination == 'master':
417 watch = self.ctx.session_factory(
418 assume=False).client('cloudwatch', region_name=self.region)
419 else:
420 watch = utils.local_session(
421 self.ctx.session_factory).client('cloudwatch', region_name=self.region)
423 # NOTE filter out value is 0 metrics data
424 if self.ignore_zero in ['1', 'true', 'True']:
425 metrics = [m for m in metrics if m.get("Value") != 0]
426 # NOTE filter metrics data by the metric name configured
427 if self.active_metrics:
428 metrics = [m for m in metrics if m["MetricName"] in self.active_metrics]
429 if not metrics:
430 return
431 return self.retry(
432 watch.put_metric_data, Namespace=ns, MetricData=metrics)
435@log_outputs.register('aws')
436class CloudWatchLogOutput(LogOutput):
438 log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
440 def __init__(self, ctx, config=None):
441 super(CloudWatchLogOutput, self).__init__(ctx, config)
442 if self.config['netloc'] == 'master' or not self.config['netloc']:
443 self.log_group = self.config['path'].strip('/')
444 else:
445 # join netloc to path for casual usages of aws://log/group/name
446 self.log_group = ("%s/%s" % (
447 self.config['netloc'], self.config['path'].strip('/'))).strip('/')
448 self.region = self.config.get('region', ctx.options.region)
449 self.destination = (
450 self.config.scheme == 'aws' and
451 self.config.get('netloc') == 'master') and 'master' or None
453 def construct_stream_name(self):
454 if self.config.get('stream') is None:
455 log_stream = self.ctx.policy.name
456 if self.config.get('region') is not None:
457 log_stream = "{}/{}".format(self.ctx.options.region, log_stream)
458 if self.config.get('netloc') == 'master':
459 log_stream = "{}/{}".format(self.ctx.options.account_id, log_stream)
460 else:
461 log_stream = self.config.get('stream').format(
462 region=self.ctx.options.region,
463 account=self.ctx.options.account_id,
464 policy=self.ctx.policy.name,
465 now=datetime.datetime.utcnow())
466 return log_stream
468 def get_handler(self):
469 log_stream = self.construct_stream_name()
470 params = dict(
471 log_group=self.log_group, log_stream=log_stream,
472 session_factory=(
473 lambda x=None: self.ctx.session_factory(
474 region=self.region, assume=self.destination != 'master')))
475 return CloudWatchLogHandler(**params)
477 def __repr__(self):
478 return "<%s to group:%s stream:%s>" % (
479 self.__class__.__name__,
480 self.ctx.options.log_group,
481 self.ctx.policy.name)
484class XrayEmitter:
485 # implement https://github.com/aws/aws-xray-sdk-python/issues/51
487 def __init__(self):
488 self.buf = []
489 self.client = None
491 def send_entity(self, entity):
492 self.buf.append(entity)
493 if len(self.buf) > 49:
494 self.flush()
496 def flush(self):
497 buf = self.buf
498 self.buf = []
499 for segment_set in utils.chunks(buf, 50):
500 self.client.put_trace_segments(
501 TraceSegmentDocuments=[s.serialize() for s in segment_set])
504class XrayContext(Context):
505 """Specialized XRay Context for Custodian.
507 A context is used as a segment storage stack for currently in
508 progress segments.
510 We use a customized context for custodian as policy execution
511 commonly uses a concurrent.futures threadpool pattern during
512 execution for api concurrency. Default xray semantics would use
513 thread local storage and treat each of those as separate trace
514 executions. We want to aggregate/associate all thread pool api
515 executions to the custoidan policy execution. XRay sdk supports
516 this via manual code for every thread pool usage, but we don't
517 want to explicitly couple xray integration everywhere across the
518 codebase. Instead we use a context that is aware of custodian
519 usage of threads and associates subsegments therein to the policy
520 execution active subsegment.
521 """
523 def __init__(self, *args, **kw):
524 super(XrayContext, self).__init__(*args, **kw)
525 self._local = Bag()
526 self._current_subsegment = None
527 self._main_tid = threading.get_ident()
529 def handle_context_missing(self):
530 """Custodian has a few api calls out of band of policy execution.
532 - Resolving account alias.
533 - Cloudwatch Log group/stream discovery/creation (when using -l on cli)
535 Also we want to folks to optionally based on configuration using xray
536 so default to disabling context missing output.
537 """
539 # Annotate any segments/subsegments with their thread ids.
540 def put_segment(self, segment):
541 if getattr(segment, 'thread_id', None) is None:
542 segment.thread_id = threading.get_ident()
543 super().put_segment(segment)
545 def put_subsegment(self, subsegment):
546 if getattr(subsegment, 'thread_id', None) is None:
547 subsegment.thread_id = threading.get_ident()
548 super().put_subsegment(subsegment)
550 # Override since we're not just popping the end of the stack, we're removing
551 # the thread subsegment from the array by identity.
552 def end_subsegment(self, end_time):
553 subsegment = self.get_trace_entity()
554 if self._is_subsegment(subsegment):
555 subsegment.close(end_time)
556 self._local.entities.remove(subsegment)
557 return True
558 else:
559 log.warning("No subsegment to end.")
560 return False
562 # Override get trace identity, any worker thread will find its own subsegment
563 # on the stack, else will use the main thread's sub/segment
564 def get_trace_entity(self):
565 tid = threading.get_ident()
566 entities = self._local.get('entities', ())
567 for s in reversed(entities):
568 if s.thread_id == tid:
569 return s
570 # custodian main thread won't advance (create new segment)
571 # with worker threads still doing pool work.
572 elif s.thread_id == self._main_tid:
573 return s
574 return self.handle_context_missing()
577@tracer_outputs.register('xray', condition=HAVE_XRAY)
578class XrayTracer:
580 emitter = XrayEmitter()
582 in_lambda = 'LAMBDA_TASK_ROOT' in os.environ
583 use_daemon = 'AWS_XRAY_DAEMON_ADDRESS' in os.environ
584 service_name = 'custodian'
586 @classmethod
587 def initialize(cls, config):
588 context = XrayContext()
589 sampling = config.get('sample', 'true') == 'true' and True or False
590 xray_recorder.configure(
591 emitter=cls.use_daemon is False and cls.emitter or None,
592 context=context,
593 sampling=sampling,
594 context_missing='LOG_ERROR')
595 patch(['boto3', 'requests'])
596 logging.getLogger('aws_xray_sdk.core').setLevel(logging.ERROR)
598 def __init__(self, ctx, config):
599 self.ctx = ctx
600 self.config = config or {}
601 self.client = None
602 self.metadata = {}
604 @contextlib.contextmanager
605 def subsegment(self, name):
606 segment = xray_recorder.begin_subsegment(name)
607 try:
608 yield segment
609 except Exception as e:
610 stack = traceback.extract_stack(limit=xray_recorder.max_trace_back)
611 segment.add_exception(e, stack)
612 raise
613 finally:
614 xray_recorder.end_subsegment(time.time())
616 def __enter__(self):
617 if self.client is None:
618 self.client = self.ctx.session_factory(assume=False).client('xray')
620 self.emitter.client = self.client
622 if self.in_lambda:
623 self.segment = xray_recorder.begin_subsegment(self.service_name)
624 else:
625 self.segment = xray_recorder.begin_segment(
626 self.service_name, sampling=True)
628 p = self.ctx.policy
629 xray_recorder.put_annotation('policy', p.name)
630 xray_recorder.put_annotation('resource', p.resource_type)
631 if self.ctx.options.account_id:
632 xray_recorder.put_annotation('account', self.ctx.options.account_id)
634 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
635 metadata = self.ctx.get_metadata(('api-stats',))
636 metadata.update(self.metadata)
637 xray_recorder.put_metadata('custodian', metadata)
638 if self.in_lambda:
639 xray_recorder.end_subsegment()
640 return
641 xray_recorder.end_segment()
642 if not self.use_daemon:
643 self.emitter.flush()
644 log.info(
645 ('View XRay Trace https://console.aws.amazon.com/xray/home?region=%s#/'
646 'traces/%s' % (self.ctx.options.region, self.segment.trace_id)))
647 self.metadata.clear()
650@api_stats_outputs.register('aws')
651class ApiStats(DeltaStats):
653 def __init__(self, ctx, config=None):
654 super(ApiStats, self).__init__(ctx, config)
655 self.api_calls = Counter()
657 def get_snapshot(self):
658 return dict(self.api_calls)
660 def get_metadata(self):
661 return self.get_snapshot()
663 def __enter__(self):
664 if isinstance(self.ctx.session_factory, credentials.SessionFactory):
665 self.ctx.session_factory.set_subscribers((self,))
666 self.push_snapshot()
668 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
669 if isinstance(self.ctx.session_factory, credentials.SessionFactory):
670 self.ctx.session_factory.set_subscribers(())
672 # With cached sessions, we need to unregister any events subscribers
673 # on extant sessions to allow for the next registration.
674 utils.local_session(self.ctx.session_factory).events.unregister(
675 'after-call.*.*', self._record, unique_id='c7n-api-stats')
677 self.ctx.metrics.put_metric(
678 "ApiCalls", sum(self.api_calls.values()), "Count")
679 self.pop_snapshot()
681 def __call__(self, s):
682 s.events.register(
683 'after-call.*.*', self._record, unique_id='c7n-api-stats')
685 def _record(self, http_response, parsed, model, **kwargs):
686 self.api_calls["%s.%s" % (
687 model.service_model.endpoint_prefix, model.name)] += 1
690@blob_outputs.register('s3')
691class S3Output(BlobOutput):
692 """
693 Usage:
695 .. code-block:: python
697 with S3Output(session_factory, 's3://bucket/prefix'):
698 log.info('xyz') # -> log messages sent to custodian-run.log.gz
700 """
702 permissions = ('S3:PutObject',)
704 def __init__(self, ctx, config):
705 super().__init__(ctx, config)
706 self._transfer = None
708 @property
709 def transfer(self):
710 if self._transfer:
711 return self._transfer
712 bucket_region = self.config.region or None
713 self._transfer = S3Transfer(
714 self.ctx.session_factory(region=bucket_region, assume=False).client('s3'))
715 return self._transfer
717 def upload_file(self, path, key):
718 self.transfer.upload_file(
719 path, self.bucket, key,
720 extra_args={
721 'ACL': 'bucket-owner-full-control',
722 'ServerSideEncryption': 'AES256'})
725@clouds.register('aws')
726class AWS(Provider):
728 display_name = 'AWS'
729 resource_prefix = 'aws'
730 # legacy path for older plugins
731 resources = PluginRegistry('resources')
732 # import paths for resources
733 resource_map = ResourceMap
735 def initialize(self, options):
736 """
737 """
738 _default_region(options)
739 _default_account_id(options)
740 _default_bucket_region(options)
742 if options.tracer and options.tracer.startswith('xray') and HAVE_XRAY:
743 XrayTracer.initialize(utils.parse_url_config(options.tracer))
744 return options
746 def get_session_factory(self, options):
747 return SessionFactory(
748 options.region,
749 options.profile,
750 options.assume_role,
751 options.external_id)
753 def initialize_policies(self, policy_collection, options):
754 """Return a set of policies targetted to the given regions.
756 Supports symbolic regions like 'all'. This will automatically
757 filter out policies if they are being targetted to a region that
758 does not support the service. Global services will target a
759 single region (us-east-1 if only all specified, else first
760 region in the list).
762 Note for region partitions (govcloud and china) an explicit
763 region from the partition must be passed in.
764 """
765 from c7n.policy import Policy, PolicyCollection
766 policies = []
767 service_region_map, resource_service_map = get_service_region_map(
768 options.regions, policy_collection.resource_types, self.type)
769 if 'all' in options.regions:
770 enabled_regions = {
771 r['RegionName'] for r in
772 get_profile_session(options).client('ec2').describe_regions(
773 Filters=[{'Name': 'opt-in-status',
774 'Values': ['opt-in-not-required', 'opted-in']}]
775 ).get('Regions')}
776 for p in policy_collection:
777 if 'aws.' in p.resource_type:
778 _, resource_type = p.resource_type.split('.', 1)
779 else:
780 resource_type = p.resource_type
781 available_regions = service_region_map.get(
782 resource_service_map.get(resource_type), ())
784 # its a global service/endpoint, use user provided region
785 # or us-east-1.
786 if not available_regions and options.regions:
787 candidates = [r for r in options.regions if r != 'all']
788 candidate = candidates and candidates[0] or 'us-east-1'
789 svc_regions = [candidate]
790 elif 'all' in options.regions:
791 svc_regions = list(set(available_regions).intersection(enabled_regions))
792 else:
793 svc_regions = options.regions
795 for region in svc_regions:
796 if available_regions and region not in available_regions:
797 level = ('all' in options.regions and
798 logging.DEBUG or logging.WARNING)
799 # TODO: fixme
800 policy_collection.log.log(
801 level, "policy:%s resources:%s not available in region:%s",
802 p.name, p.resource_type, region)
803 continue
804 options_copy = copy.copy(options)
805 options_copy.region = str(region)
807 if len(options.regions) > 1 or 'all' in options.regions and getattr(
808 options, 'output_dir', None):
809 options_copy.output_dir = join_output(options.output_dir, region)
810 policies.append(
811 Policy(p.data, options_copy,
812 session_factory=policy_collection.session_factory()))
814 return PolicyCollection(
815 # order policies by region to minimize local session invalidation.
816 # note relative ordering of policies must be preserved, python sort
817 # is stable.
818 sorted(policies, key=operator.attrgetter('options.region')),
819 options)
822def join_output(output_dir, suffix):
823 if '{region}' in output_dir:
824 return output_dir.rstrip('/')
825 if output_dir.endswith('://'):
826 return output_dir + suffix
827 output_url_parts = urlparse.urlparse(output_dir)
828 # for output urls, the end of the url may be a
829 # query string. make sure we add a suffix to
830 # the path component.
831 output_url_parts = output_url_parts._replace(
832 path = output_url_parts.path.rstrip('/') + '/%s' % suffix
833 )
834 return urlparse.urlunparse(output_url_parts)
837def fake_session():
838 session = boto3.Session( # nosec nosemgrep
839 region_name='us-east-1',
840 aws_access_key_id='never',
841 aws_secret_access_key='found')
842 return session
845def get_service_region_map(regions, resource_types, provider='aws'):
846 # we're not interacting with the apis just using the sdk meta information.
848 session = fake_session()
849 normalized_types = []
850 for r in resource_types:
851 if r.startswith('%s.' % provider):
852 normalized_types.append(r[len(provider) + 1:])
853 else:
854 normalized_types.append(r)
855 resource_service_map = {
856 r: clouds[provider].resources.get(r).resource_type.service
857 for r in normalized_types if r != 'account'}
858 # support for govcloud, china, and iso. We only utilize these regions if they
859 # are explicitly passed in on the cli.
860 partition_regions = {}
861 for p in ('aws-cn', 'aws-us-gov', 'aws-iso'):
862 for r in session.get_available_regions('s3', partition_name=p):
863 partition_regions[r] = p
865 partitions = ['aws']
866 for r in regions:
867 if r in partition_regions:
868 partitions.append(partition_regions[r])
870 service_region_map = {}
871 for s in set(itertools.chain(resource_service_map.values())):
872 for partition in partitions:
873 service_region_map.setdefault(s, []).extend(
874 session.get_available_regions(s, partition_name=partition))
875 return service_region_map, resource_service_map