1# Copyright The Cloud Custodian Authors.
2# SPDX-License-Identifier: Apache-2.0
3
4from c7n.provider import clouds, Provider
5
6from collections import Counter, namedtuple
7import contextlib
8import copy
9import datetime
10import itertools
11import logging
12import os
13import operator
14import socket
15import sys
16import time
17import threading
18import traceback
19from urllib import parse as urlparse
20from urllib.request import urlopen, Request
21from urllib.error import HTTPError, URLError
22
23import boto3
24
25from botocore.validate import ParamValidator
26from boto3.s3.transfer import S3Transfer
27
28from c7n.credentials import SessionFactory
29from c7n.config import Bag
30from c7n.exceptions import InvalidOutputConfig, PolicyValidationError
31from c7n.log import CloudWatchLogHandler
32from c7n.utils import parse_url_config, backoff_delays
33
34from .resource_map import ResourceMap
35
36# Import output registries aws provider extends.
37from c7n.output import (
38 api_stats_outputs,
39 blob_outputs,
40 log_outputs,
41 metrics_outputs,
42 tracer_outputs
43)
44
45# Output base implementations we extend.
46from c7n.output import (
47 Metrics,
48 DeltaStats,
49 BlobOutput,
50 LogOutput,
51)
52
53from c7n.registry import PluginRegistry
54from c7n import credentials, utils
55
56log = logging.getLogger('custodian.aws')
57
58try:
59 from aws_xray_sdk.core import xray_recorder, patch
60 from aws_xray_sdk.core.context import Context
61 HAVE_XRAY = True
62except ImportError:
63 HAVE_XRAY = False
64 class Context: pass # NOQA
65
66_profile_session = None
67
68
69DEFAULT_NAMESPACE = "CloudMaid"
70
71
72def get_profile_session(options):
73 global _profile_session
74 if _profile_session:
75 return _profile_session
76
77 profile = getattr(options, 'profile', None)
78 _profile_session = boto3.Session(profile_name=profile)
79 return _profile_session
80
81
82def _default_region(options):
83 marker = object()
84 value = getattr(options, 'regions', marker)
85 if value is marker:
86 return
87
88 if len(value) > 0:
89 return
90
91 try:
92 options.regions = [get_profile_session(options).region_name]
93 except Exception:
94 log.warning('Could not determine default region')
95 options.regions = [None]
96
97 if options.regions[0] is None:
98 log.error('No default region set. Specify a default via AWS_DEFAULT_REGION '
99 'or setting a region in ~/.aws/config')
100 sys.exit(1)
101
102 log.debug("using default region:%s from boto" % options.regions[0])
103
104
105def _default_account_id(options):
106 if options.account_id:
107 return
108 elif options.assume_role:
109 try:
110 options.account_id = options.assume_role.split(':')[4]
111 return
112 except IndexError:
113 pass
114 try:
115 session = get_profile_session(options)
116 options.account_id = utils.get_account_id_from_sts(session)
117 except Exception:
118 options.account_id = None
119
120
121def _default_bucket_region(options):
122 # modify options to format s3 output urls with explicit region.
123 if not options.output_dir.startswith('s3://'):
124 return
125
126 parsed = urlparse.urlparse(options.output_dir)
127 s3_conf = parse_url_config(options.output_dir)
128 if parsed.query and s3_conf.get("region"):
129 return
130
131 # s3 clients default to us-east-1 if no region is specified, but for partition
132 # support we default to using a passed in region if given.
133 region = None
134 if options.regions:
135 region = options.regions[0]
136
137 # we're operating pre the expansion of symbolic name all into actual regions.
138 if region == "all":
139 region = None
140
141 try:
142 options.output_dir = get_bucket_url_with_region(options.output_dir, region)
143 except ValueError as err:
144 invalid_output = InvalidOutputConfig(str(err))
145 invalid_output.__suppress_context__ = True
146 raise invalid_output
147
148
149def shape_validate(params, shape_name, service):
150 session = fake_session()._session
151 model = session.get_service_model(service)
152 shape = model.shape_for(shape_name)
153 validator = ParamValidator()
154 report = validator.validate(params, shape)
155 if report.has_errors():
156 raise PolicyValidationError(report.generate_report())
157
158
159def get_bucket_url_with_region(bucket_url, region):
160 parsed = urlparse.urlparse(bucket_url)
161 s3_conf = parse_url_config(bucket_url)
162 params = {}
163 if region:
164 params['region_name'] = region
165
166 client = boto3.client('s3', **params)
167 region = inspect_bucket_region(s3_conf.netloc, client.meta.endpoint_url)
168 if not region:
169 raise ValueError(f"could not determine region for output bucket, use explicit ?region=region_name. {s3_conf.url}") # noqa
170 query = f"region={region}"
171 if parsed.query:
172 query = parsed.query + f"®ion={region}"
173 parts = parsed._replace(
174 path=parsed.path.strip("/"),
175 query=query
176 )
177 return urlparse.urlunparse(parts)
178
179
180def inspect_bucket_region(bucket, s3_endpoint, allow_public=False):
181 """Attempt to determine a bucket region without a client
182
183 We can make an unauthenticated HTTP HEAD request to S3 in an attempt to find a bucket's
184 region. This avoids some issues with cross-account/cross-region uses of the
185 GetBucketLocation or HeadBucket API action. Because bucket names are unique within
186 AWS partitions, we can make requests to a single regional S3 endpoint
187 and get redirected if a bucket lives in another region within the
188 same partition.
189
190 This approach is inspired by some sample code from a Go SDK issue comment,
191 which @sean-zou mentioned in #7593:
192
193 https://github.com/aws/aws-sdk-go/issues/720#issuecomment-613038544
194
195 Return a region string, or None if we're unable to determine one.
196 """
197 region = None
198 s3_endpoint_parts = urlparse.urlparse(s3_endpoint)
199 # Use a "path-style" S3 URL here to avoid failing TLS certificate validation
200 # on buckets with a dot in the name.
201 #
202 # According to the following blog post, before deprecating path-style
203 # URLs AWS will provide a way for virtual-hosted-style URLs to handle
204 # buckets with dots in their names. Using path-style URLs here in
205 # the meantime seems reasonable, compared to alternatives like forcing
206 # HTTP or ignoring certificate validation.
207 #
208 # https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/
209 bucket_endpoint = f'https://{s3_endpoint_parts.netloc}/{bucket}'
210 request = Request(bucket_endpoint, method='HEAD')
211 try:
212 # For private buckets the head request will always raise an
213 # http error, the status code and response headers provide
214 # context for where the bucket is. For public buckets we
215 # default to raising an exception as unsuitable location at
216 # least for the output use case.
217 #
218 # Dynamic use of urllib trips up static analyzers because of
219 # the potential to accidentally allow unexpected schemes like
220 # file:/. Here we're hardcoding the https scheme, so we can
221 # ignore those specific checks.
222 #
223 # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected # noqa
224 response = url_socket_retry(urlopen, request) # nosec B310
225 # Successful response indicates a public accessible bucket in the same region
226 region = response.headers.get('x-amz-bucket-region')
227
228 if not allow_public:
229 raise ValueError("bucket: '{bucket}' is publicly accessible")
230 except HTTPError as err:
231 # Returns 404 'Not Found' for buckets that don't exist
232 if err.status == 404:
233 raise ValueError(f"bucket '{bucket}' does not exist")
234 # Permission errors (403) or redirects (301) for valid buckets
235 # should still contain a header we can use to determine the
236 # bucket region. Permission errors are indicative of correct
237 # region, while redirects are for cross region.
238 region = err.headers.get('x-amz-bucket-region')
239
240 return region
241
242
243def url_socket_retry(func, *args, **kw):
244 """retry a urllib operation in the event of certain errors.
245
246 we want to retry on some common issues for cases where we are
247 connecting through an intermediary proxy or where the downstream
248 is overloaded.
249
250 socket errors
251 - 104 - Connection reset by peer
252 - 110 - Connection timed out
253
254 http errors
255 - 503 - Slow Down | Service Unavailable
256 """
257 min_delay = 1
258 max_delay = 32
259 max_attempts = 4
260
261 for idx, delay in enumerate(
262 backoff_delays(min_delay, max_delay, jitter=True)):
263 try:
264 return func(*args, **kw)
265 except HTTPError as err:
266 if not (err.status == 503 and 'Slow Down' in err.reason):
267 raise
268 if idx == max_attempts - 1:
269 raise
270 except URLError as err:
271 if not isinstance(err.reason, socket.error):
272 raise
273 if err.reason.errno not in (104, 110):
274 raise
275 if idx == max_attempts - 1:
276 raise
277
278 time.sleep(delay)
279
280
281class Arn(namedtuple('_Arn', (
282 'arn', 'partition', 'service', 'region',
283 'account_id', 'resource', 'resource_type', 'separator'))):
284
285 __slots__ = ()
286
287 def __repr__(self):
288 return "<arn:%s:%s:%s:%s:%s%s%s>" % (
289 self.partition,
290 self.service,
291 self.region,
292 self.account_id,
293 self.resource_type,
294 self.separator,
295 self.resource)
296
297 @classmethod
298 def parse(cls, arn):
299 if isinstance(arn, Arn):
300 return arn
301 parts = arn.split(':', 5)
302 if len(parts) < 3:
303 raise ValueError("Invalid Arn")
304 # a few resources use qualifiers without specifying type
305 if parts[2] in ('s3', 'apigateway', 'execute-api', 'emr-serverless'):
306 parts.append(None)
307 parts.append(None)
308 elif '/' in parts[-1]:
309 parts.extend(reversed(parts.pop(-1).split('/', 1)))
310 parts.append('/')
311 elif ':' in parts[-1]:
312 parts.extend(reversed(parts.pop(-1).split(':', 1)))
313 parts.append(':')
314 elif len(parts) == 6:
315 parts.append('')
316 parts.append('')
317 # replace the literal 'arn' string with raw arn
318 parts[0] = arn
319 return cls(*parts)
320
321
322class ArnResolver:
323
324 def __init__(self, manager):
325 self.manager = manager
326
327 def resolve(self, arns):
328 arns = map(Arn.parse, arns)
329 a_service = operator.attrgetter('service')
330 a_resource = operator.attrgetter('resource_type')
331 kfunc = lambda a: (a_service(a), a_resource(a)) # noqa
332 arns = sorted(arns, key=kfunc)
333 results = {}
334 for (service, arn_type), arn_set in itertools.groupby(arns, key=kfunc):
335 arn_set = list(arn_set)
336 rtype = ArnResolver.resolve_type(arn_set[0])
337 rmanager = self.manager.get_resource_manager(rtype)
338 if rtype == 'sns':
339 resources = rmanager.get_resources(
340 [rarn.arn for rarn in arn_set])
341 else:
342 resources = rmanager.get_resources(
343 [rarn.resource for rarn in arn_set])
344 for rarn, r in zip(rmanager.get_arns(resources), resources):
345 results[rarn] = r
346
347 for rarn in arn_set:
348 if rarn.arn not in results:
349 results[rarn.arn] = None
350 return results
351
352 @staticmethod
353 def resolve_type(arn):
354 arn = Arn.parse(arn)
355
356 # this would benefit from a class cache {service} -> rtypes
357 for type_name, klass in AWS.resources.items():
358 if type_name in ('rest-account', 'account') or klass.resource_type.arn is False:
359 continue
360 if arn.service != (klass.resource_type.arn_service or klass.resource_type.service):
361 continue
362 if (type_name in ('asg', 'ecs-task') and
363 "%s%s" % (klass.resource_type.arn_type, klass.resource_type.arn_separator)
364 in arn.resource_type):
365 return type_name
366 elif (klass.resource_type.arn_type is not None and
367 klass.resource_type.arn_type == arn.resource_type):
368 return type_name
369 elif (klass.resource_type.arn_service == arn.service and
370 klass.resource_type.arn_type == ""):
371 return type_name
372
373
374@metrics_outputs.register('aws')
375class MetricsOutput(Metrics):
376 """Send metrics data to cloudwatch
377 """
378
379 permissions = ("cloudWatch:PutMetricData",)
380 retry = staticmethod(utils.get_retry(('Throttling',)))
381
382 def __init__(self, ctx, config=None):
383 super(MetricsOutput, self).__init__(ctx, config)
384 self.namespace = self.config.get('namespace', DEFAULT_NAMESPACE)
385 self.region = self.config.get('region')
386 self.ignore_zero = self.config.get('ignore_zero')
387 am = self.config.get('active_metrics')
388 self.active_metrics = am and am.split(',')
389 self.destination = (
390 self.config.scheme == 'aws' and
391 self.config.get('netloc') == 'master') and 'master' or None
392
393 def _format_metric(self, key, value, unit, dimensions):
394 d = {
395 "MetricName": key,
396 "Timestamp": datetime.datetime.utcnow(),
397 "Value": value,
398 "Unit": unit}
399 d["Dimensions"] = [
400 {"Name": "Policy", "Value": self.ctx.policy.name},
401 {"Name": "ResType", "Value": self.ctx.policy.resource_type}]
402 for k, v in dimensions.items():
403 # Skip legacy static dimensions if using new capabilities
404 if (self.destination or self.region) and k == 'Scope':
405 continue
406 d['Dimensions'].append({"Name": k, "Value": v})
407 if self.region:
408 d['Dimensions'].append(
409 {'Name': 'Region', 'Value': self.ctx.options.region})
410 if self.destination:
411 d['Dimensions'].append(
412 {'Name': 'Account', 'Value': self.ctx.options.account_id or ''})
413 return d
414
415 def _put_metrics(self, ns, metrics):
416 if self.destination == 'master':
417 watch = self.ctx.session_factory(
418 assume=False).client('cloudwatch', region_name=self.region)
419 else:
420 watch = utils.local_session(
421 self.ctx.session_factory).client('cloudwatch', region_name=self.region)
422
423 # NOTE filter out value is 0 metrics data
424 if self.ignore_zero in ['1', 'true', 'True']:
425 metrics = [m for m in metrics if m.get("Value") != 0]
426 # NOTE filter metrics data by the metric name configured
427 if self.active_metrics:
428 metrics = [m for m in metrics if m["MetricName"] in self.active_metrics]
429 if not metrics:
430 return
431 return self.retry(
432 watch.put_metric_data, Namespace=ns, MetricData=metrics)
433
434
435@log_outputs.register('aws')
436class CloudWatchLogOutput(LogOutput):
437
438 log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
439
440 def __init__(self, ctx, config=None):
441 super(CloudWatchLogOutput, self).__init__(ctx, config)
442 if self.config['netloc'] == 'master' or not self.config['netloc']:
443 self.log_group = self.config['path'].strip('/')
444 else:
445 # join netloc to path for casual usages of aws://log/group/name
446 self.log_group = ("%s/%s" % (
447 self.config['netloc'], self.config['path'].strip('/'))).strip('/')
448 self.region = self.config.get('region', ctx.options.region)
449 self.destination = (
450 self.config.scheme == 'aws' and
451 self.config.get('netloc') == 'master') and 'master' or None
452
453 def construct_stream_name(self):
454 if self.config.get('stream') is None:
455 log_stream = self.ctx.policy.name
456 if self.config.get('region') is not None:
457 log_stream = "{}/{}".format(self.ctx.options.region, log_stream)
458 if self.config.get('netloc') == 'master':
459 log_stream = "{}/{}".format(self.ctx.options.account_id, log_stream)
460 else:
461 log_stream = self.config.get('stream').format(
462 region=self.ctx.options.region,
463 account=self.ctx.options.account_id,
464 policy=self.ctx.policy.name,
465 now=datetime.datetime.utcnow())
466 return log_stream
467
468 def get_handler(self):
469 log_stream = self.construct_stream_name()
470 params = dict(
471 log_group=self.log_group, log_stream=log_stream,
472 session_factory=(
473 lambda x=None: self.ctx.session_factory(
474 region=self.region, assume=self.destination != 'master')))
475 return CloudWatchLogHandler(**params)
476
477 def __repr__(self):
478 return "<%s to group:%s stream:%s>" % (
479 self.__class__.__name__,
480 self.ctx.options.log_group,
481 self.ctx.policy.name)
482
483
484class XrayEmitter:
485 # implement https://github.com/aws/aws-xray-sdk-python/issues/51
486
487 def __init__(self):
488 self.buf = []
489 self.client = None
490
491 def send_entity(self, entity):
492 self.buf.append(entity)
493 if len(self.buf) > 49:
494 self.flush()
495
496 def flush(self):
497 buf = self.buf
498 self.buf = []
499 for segment_set in utils.chunks(buf, 50):
500 self.client.put_trace_segments(
501 TraceSegmentDocuments=[s.serialize() for s in segment_set])
502
503
504class XrayContext(Context):
505 """Specialized XRay Context for Custodian.
506
507 A context is used as a segment storage stack for currently in
508 progress segments.
509
510 We use a customized context for custodian as policy execution
511 commonly uses a concurrent.futures threadpool pattern during
512 execution for api concurrency. Default xray semantics would use
513 thread local storage and treat each of those as separate trace
514 executions. We want to aggregate/associate all thread pool api
515 executions to the custoidan policy execution. XRay sdk supports
516 this via manual code for every thread pool usage, but we don't
517 want to explicitly couple xray integration everywhere across the
518 codebase. Instead we use a context that is aware of custodian
519 usage of threads and associates subsegments therein to the policy
520 execution active subsegment.
521 """
522
523 def __init__(self, *args, **kw):
524 super(XrayContext, self).__init__(*args, **kw)
525 self._local = Bag()
526 self._current_subsegment = None
527 self._main_tid = threading.get_ident()
528
529 def handle_context_missing(self):
530 """Custodian has a few api calls out of band of policy execution.
531
532 - Resolving account alias.
533 - Cloudwatch Log group/stream discovery/creation (when using -l on cli)
534
535 Also we want to folks to optionally based on configuration using xray
536 so default to disabling context missing output.
537 """
538
539 # Annotate any segments/subsegments with their thread ids.
540 def put_segment(self, segment):
541 if getattr(segment, 'thread_id', None) is None:
542 segment.thread_id = threading.get_ident()
543 super().put_segment(segment)
544
545 def put_subsegment(self, subsegment):
546 if getattr(subsegment, 'thread_id', None) is None:
547 subsegment.thread_id = threading.get_ident()
548 super().put_subsegment(subsegment)
549
550 # Override since we're not just popping the end of the stack, we're removing
551 # the thread subsegment from the array by identity.
552 def end_subsegment(self, end_time):
553 subsegment = self.get_trace_entity()
554 if self._is_subsegment(subsegment):
555 subsegment.close(end_time)
556 self._local.entities.remove(subsegment)
557 return True
558 else:
559 log.warning("No subsegment to end.")
560 return False
561
562 # Override get trace identity, any worker thread will find its own subsegment
563 # on the stack, else will use the main thread's sub/segment
564 def get_trace_entity(self):
565 tid = threading.get_ident()
566 entities = self._local.get('entities', ())
567 for s in reversed(entities):
568 if s.thread_id == tid:
569 return s
570 # custodian main thread won't advance (create new segment)
571 # with worker threads still doing pool work.
572 elif s.thread_id == self._main_tid:
573 return s
574 return self.handle_context_missing()
575
576
577@tracer_outputs.register('xray', condition=HAVE_XRAY)
578class XrayTracer:
579
580 emitter = XrayEmitter()
581
582 in_lambda = 'LAMBDA_TASK_ROOT' in os.environ
583 use_daemon = 'AWS_XRAY_DAEMON_ADDRESS' in os.environ
584 service_name = 'custodian'
585
586 @classmethod
587 def initialize(cls, config):
588 context = XrayContext()
589 sampling = config.get('sample', 'true') == 'true' and True or False
590 xray_recorder.configure(
591 emitter=cls.use_daemon is False and cls.emitter or None,
592 context=context,
593 sampling=sampling,
594 context_missing='LOG_ERROR')
595 patch(['boto3', 'requests'])
596 logging.getLogger('aws_xray_sdk.core').setLevel(logging.ERROR)
597
598 def __init__(self, ctx, config):
599 self.ctx = ctx
600 self.config = config or {}
601 self.client = None
602 self.metadata = {}
603
604 @contextlib.contextmanager
605 def subsegment(self, name):
606 segment = xray_recorder.begin_subsegment(name)
607 try:
608 yield segment
609 except Exception as e:
610 stack = traceback.extract_stack(limit=xray_recorder.max_trace_back)
611 segment.add_exception(e, stack)
612 raise
613 finally:
614 xray_recorder.end_subsegment(time.time())
615
616 def __enter__(self):
617 if self.client is None:
618 self.client = self.ctx.session_factory(assume=False).client('xray')
619
620 self.emitter.client = self.client
621
622 if self.in_lambda:
623 self.segment = xray_recorder.begin_subsegment(self.service_name)
624 else:
625 self.segment = xray_recorder.begin_segment(
626 self.service_name, sampling=True)
627
628 p = self.ctx.policy
629 xray_recorder.put_annotation('policy', p.name)
630 xray_recorder.put_annotation('resource', p.resource_type)
631 if self.ctx.options.account_id:
632 xray_recorder.put_annotation('account', self.ctx.options.account_id)
633
634 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
635 metadata = self.ctx.get_metadata(('api-stats',))
636 metadata.update(self.metadata)
637 xray_recorder.put_metadata('custodian', metadata)
638 if self.in_lambda:
639 xray_recorder.end_subsegment()
640 return
641 xray_recorder.end_segment()
642 if not self.use_daemon:
643 self.emitter.flush()
644 log.info(
645 ('View XRay Trace https://console.aws.amazon.com/xray/home?region=%s#/'
646 'traces/%s' % (self.ctx.options.region, self.segment.trace_id)))
647 self.metadata.clear()
648
649
650@api_stats_outputs.register('aws')
651class ApiStats(DeltaStats):
652
653 def __init__(self, ctx, config=None):
654 super(ApiStats, self).__init__(ctx, config)
655 self.api_calls = Counter()
656
657 def get_snapshot(self):
658 return dict(self.api_calls)
659
660 def get_metadata(self):
661 return self.get_snapshot()
662
663 def __enter__(self):
664 if isinstance(self.ctx.session_factory, credentials.SessionFactory):
665 self.ctx.session_factory.set_subscribers((self,))
666 self.push_snapshot()
667
668 def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
669 if isinstance(self.ctx.session_factory, credentials.SessionFactory):
670 self.ctx.session_factory.set_subscribers(())
671
672 # With cached sessions, we need to unregister any events subscribers
673 # on extant sessions to allow for the next registration.
674 utils.local_session(self.ctx.session_factory).events.unregister(
675 'after-call.*.*', self._record, unique_id='c7n-api-stats')
676
677 self.ctx.metrics.put_metric(
678 "ApiCalls", sum(self.api_calls.values()), "Count")
679 self.pop_snapshot()
680
681 def __call__(self, s):
682 s.events.register(
683 'after-call.*.*', self._record, unique_id='c7n-api-stats')
684
685 def _record(self, http_response, parsed, model, **kwargs):
686 self.api_calls["%s.%s" % (
687 model.service_model.endpoint_prefix, model.name)] += 1
688
689
690@blob_outputs.register('s3')
691class S3Output(BlobOutput):
692 """
693 Usage:
694
695 .. code-block:: python
696
697 with S3Output(session_factory, 's3://bucket/prefix'):
698 log.info('xyz') # -> log messages sent to custodian-run.log.gz
699
700 """
701
702 permissions = ('S3:PutObject',)
703
704 def __init__(self, ctx, config):
705 super().__init__(ctx, config)
706 self._transfer = None
707
708 @property
709 def transfer(self):
710 if self._transfer:
711 return self._transfer
712 bucket_region = self.config.region or None
713 self._transfer = S3Transfer(
714 self.ctx.session_factory(region=bucket_region, assume=False).client('s3'))
715 return self._transfer
716
717 def upload_file(self, path, key):
718 self.transfer.upload_file(
719 path, self.bucket, key,
720 extra_args={
721 'ACL': 'bucket-owner-full-control',
722 'ServerSideEncryption': 'AES256'})
723
724
725@clouds.register('aws')
726class AWS(Provider):
727
728 display_name = 'AWS'
729 resource_prefix = 'aws'
730 # legacy path for older plugins
731 resources = PluginRegistry('resources')
732 # import paths for resources
733 resource_map = ResourceMap
734
735 def initialize(self, options):
736 """
737 """
738 _default_region(options)
739 _default_account_id(options)
740 _default_bucket_region(options)
741
742 if options.tracer and options.tracer.startswith('xray') and HAVE_XRAY:
743 XrayTracer.initialize(utils.parse_url_config(options.tracer))
744 return options
745
746 def get_session_factory(self, options):
747 return SessionFactory(
748 options.region,
749 options.profile,
750 options.assume_role,
751 options.external_id,
752 options.session_policy)
753
754 def initialize_policies(self, policy_collection, options):
755 """Return a set of policies targetted to the given regions.
756
757 Supports symbolic regions like 'all'. This will automatically
758 filter out policies if they are being targetted to a region that
759 does not support the service. Global services will target a
760 single region (us-east-1 if only all specified, else first
761 region in the list).
762
763 Note for region partitions (govcloud and china) an explicit
764 region from the partition must be passed in.
765 """
766 from c7n.policy import Policy, PolicyCollection
767 policies = []
768 service_region_map, resource_service_map = get_service_region_map(
769 options.regions, policy_collection.resource_types, self.type)
770 if 'all' in options.regions:
771 enabled_regions = {
772 r['RegionName'] for r in
773 get_profile_session(options).client('ec2').describe_regions(
774 Filters=[{'Name': 'opt-in-status',
775 'Values': ['opt-in-not-required', 'opted-in']}]
776 ).get('Regions')}
777 for p in policy_collection:
778 if 'aws.' in p.resource_type:
779 _, resource_type = p.resource_type.split('.', 1)
780 else:
781 resource_type = p.resource_type
782 available_regions = service_region_map.get(
783 resource_service_map.get(resource_type), ())
784
785 # its a global service/endpoint, use user provided region
786 # or us-east-1.
787 if not available_regions and options.regions:
788 candidates = [r for r in options.regions if r != 'all']
789 candidate = candidates and candidates[0] or 'us-east-1'
790 svc_regions = [candidate]
791 elif 'all' in options.regions:
792 svc_regions = list(set(available_regions).intersection(enabled_regions))
793 else:
794 svc_regions = options.regions
795
796 for region in svc_regions:
797 if available_regions and region not in available_regions:
798 level = ('all' in options.regions and
799 logging.DEBUG or logging.WARNING)
800 # TODO: fixme
801 policy_collection.log.log(
802 level, "policy:%s resources:%s not available in region:%s",
803 p.name, p.resource_type, region)
804 continue
805 options_copy = copy.copy(options)
806 options_copy.region = str(region)
807
808 if len(options.regions) > 1 or 'all' in options.regions and getattr(
809 options, 'output_dir', None):
810 options_copy.output_dir = join_output(options.output_dir, region)
811 policies.append(
812 Policy(p.data, options_copy,
813 session_factory=policy_collection.session_factory()))
814
815 return PolicyCollection(
816 # order policies by region to minimize local session invalidation.
817 # note relative ordering of policies must be preserved, python sort
818 # is stable.
819 sorted(policies, key=operator.attrgetter('options.region')),
820 options)
821
822
823def join_output(output_dir, suffix):
824 if '{region}' in output_dir:
825 return output_dir.rstrip('/')
826 if output_dir.endswith('://'):
827 return output_dir + suffix
828 output_url_parts = urlparse.urlparse(output_dir)
829 # for output urls, the end of the url may be a
830 # query string. make sure we add a suffix to
831 # the path component.
832 output_url_parts = output_url_parts._replace(
833 path=output_url_parts.path.rstrip('/') + '/%s' % suffix
834 )
835 return urlparse.urlunparse(output_url_parts)
836
837
838def fake_session():
839 session = boto3.Session( # nosec nosemgrep
840 region_name='us-east-1',
841 aws_access_key_id='never',
842 aws_secret_access_key='found')
843 return session
844
845
846def get_service_region_map(regions, resource_types, provider='aws'):
847 # we're not interacting with the apis just using the sdk meta information.
848
849 session = fake_session()
850 normalized_types = []
851 for r in resource_types:
852 if r.startswith('%s.' % provider):
853 normalized_types.append(r[len(provider) + 1:])
854 else:
855 normalized_types.append(r)
856 resource_service_map = {
857 r: clouds[provider].resources.get(r).resource_type.service
858 for r in normalized_types if r != 'account'}
859 # support for govcloud, china, and iso. We only utilize these regions if they
860 # are explicitly passed in on the cli.
861 partition_regions = {}
862 for p in ('aws-cn', 'aws-us-gov', 'aws-iso'):
863 for r in session.get_available_regions('s3', partition_name=p):
864 partition_regions[r] = p
865
866 partitions = ['aws']
867 for r in regions:
868 if r in partition_regions:
869 partitions.append(partition_regions[r])
870
871 service_region_map = {}
872 for s in set(itertools.chain(resource_service_map.values())):
873 for partition in partitions:
874 service_region_map.setdefault(s, []).extend(
875 session.get_available_regions(s, partition_name=partition))
876 return service_region_map, resource_service_map