Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/utils.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

635 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import copy 

4from collections import UserString 

5from datetime import datetime, timedelta 

6from dateutil.tz import tzutc 

7import json 

8import itertools 

9import ipaddress 

10import logging 

11import os 

12import random 

13import re 

14import sys 

15import threading 

16import time 

17from urllib import parse as urlparse 

18from urllib.request import getproxies, proxy_bypass 

19 

20from dateutil.parser import ParserError, parse 

21 

22import jmespath 

23from jmespath import functions 

24from jmespath.parser import Parser, ParsedResult 

25 

26from c7n import config 

27from c7n.exceptions import ClientError, PolicyValidationError 

28 

29# Try to play nice in a serverless environment, where we don't require yaml 

30 

31try: 

32 import yaml 

33except ImportError: # pragma: no cover 

34 SafeLoader = BaseSafeDumper = yaml = None 

35else: 

36 try: 

37 from yaml import CSafeLoader as SafeLoader, CSafeDumper as BaseSafeDumper 

38 except ImportError: # pragma: no cover 

39 from yaml import SafeLoader, SafeDumper as BaseSafeDumper 

40 

41 

42class SafeDumper(BaseSafeDumper or object): 

43 def ignore_aliases(self, data): 

44 return True 

45 

46 

47log = logging.getLogger('custodian.utils') 

48 

49 

50class VarsSubstitutionError(Exception): 

51 pass 

52 

53 

54def load_file(path, format=None, vars=None): 

55 if format is None: 

56 format = 'yaml' 

57 _, ext = os.path.splitext(path) 

58 if ext[1:] == 'json': 

59 format = 'json' 

60 

61 with open(path) as fh: 

62 contents = fh.read() 

63 

64 if vars: 

65 try: 

66 contents = contents.format(**vars) 

67 except IndexError: 

68 msg = 'Failed to substitute variable by positional argument.' 

69 raise VarsSubstitutionError(msg) 

70 except KeyError as e: 

71 msg = 'Failed to substitute variables. KeyError on {}'.format(str(e)) 

72 raise VarsSubstitutionError(msg) 

73 

74 if format == 'yaml': 

75 return yaml_load(contents) 

76 elif format == 'json': 

77 return loads(contents) 

78 

79 

80def yaml_load(value): 

81 if yaml is None: 

82 raise RuntimeError("Yaml not available") 

83 return yaml.load(value, Loader=SafeLoader) 

84 

85 

86def yaml_dump(value): 

87 if yaml is None: 

88 raise RuntimeError("Yaml not available") 

89 return yaml.dump(value, default_flow_style=False, Dumper=SafeDumper) 

90 

91 

92def loads(body): 

93 return json.loads(body) 

94 

95 

96def dumps(data, fh=None, indent=0): 

97 if fh: 

98 return json.dump(data, fh, cls=JsonEncoder, indent=indent) 

99 else: 

100 return json.dumps(data, cls=JsonEncoder, indent=indent) 

101 

102 

103def format_event(evt): 

104 return json.dumps(evt, indent=2) 

105 

106 

107def filter_empty(d): 

108 for k, v in list(d.items()): 

109 if not v: 

110 del d[k] 

111 return d 

112 

113 

114# We need a minimum floor when examining possible timestamp 

115# values to distinguish from other numeric time usages. Use 

116# the S3 Launch Date. 

117DATE_FLOOR = time.mktime((2006, 3, 19, 0, 0, 0, 0, 0, 0)) 

118 

119 

120def parse_date(v, tz=None): 

121 """Handle various permutations of a datetime serialization 

122 to a datetime with the given timezone. 

123 

124 Handles strings, seconds since epoch, and milliseconds since epoch. 

125 """ 

126 

127 if v is None: 

128 return v 

129 

130 tz = tz or tzutc() 

131 

132 if isinstance(v, datetime): 

133 if v.tzinfo is None: 

134 return v.astimezone(tz) 

135 return v 

136 

137 if isinstance(v, str) and not v.isdigit(): 

138 try: 

139 return parse(v).astimezone(tz) 

140 except (AttributeError, TypeError, ValueError, OverflowError): 

141 pass 

142 

143 # OSError on windows -- https://bugs.python.org/issue36439 

144 exceptions = (ValueError, OSError) if os.name == "nt" else (ValueError) 

145 

146 if isinstance(v, (int, float, str)): 

147 try: 

148 if float(v) > DATE_FLOOR: 

149 v = datetime.fromtimestamp(float(v)).astimezone(tz) 

150 except exceptions: 

151 pass 

152 

153 if isinstance(v, (int, float, str)): 

154 # try interpreting as milliseconds epoch 

155 try: 

156 if float(v) > DATE_FLOOR: 

157 v = datetime.fromtimestamp(float(v) / 1000).astimezone(tz) 

158 except exceptions: 

159 pass 

160 

161 return isinstance(v, datetime) and v or None 

162 

163 

164def type_schema( 

165 type_name, inherits=None, rinherit=None, 

166 aliases=None, required=None, **props): 

167 """jsonschema generation helper 

168 

169 params: 

170 - type_name: name of the type 

171 - inherits: list of document fragments that are required via anyOf[$ref] 

172 - rinherit: use another schema as a base for this, basically work around 

173 inherits issues with additionalProperties and type enums. 

174 - aliases: additional names this type maybe called 

175 - required: list of required properties, by default 'type' is required 

176 - props: additional key value properties 

177 """ 

178 if aliases: 

179 type_names = [type_name] 

180 type_names.extend(aliases) 

181 else: 

182 type_names = [type_name] 

183 

184 if rinherit: 

185 s = copy.deepcopy(rinherit) 

186 s['properties']['type'] = {'enum': type_names} 

187 else: 

188 s = { 

189 'type': 'object', 

190 'properties': { 

191 'type': {'enum': type_names}}} 

192 

193 # Ref based inheritance and additional properties don't mix well. 

194 # https://stackoverflow.com/questions/22689900/json-schema-allof-with-additionalproperties 

195 if not inherits: 

196 s['additionalProperties'] = False 

197 

198 s['properties'].update(props) 

199 

200 for k, v in props.items(): 

201 if v is None: 

202 del s['properties'][k] 

203 if not required: 

204 required = [] 

205 if isinstance(required, list): 

206 required.append('type') 

207 s['required'] = required 

208 if inherits: 

209 extended = s 

210 s = {'allOf': [{'$ref': i} for i in inherits]} 

211 s['allOf'].append(extended) 

212 return s 

213 

214 

215class JsonEncoder(json.JSONEncoder): 

216 

217 def default(self, obj): 

218 if isinstance(obj, datetime): 

219 return obj.isoformat() 

220 if isinstance(obj, FormatDate): 

221 return obj.datetime.isoformat() 

222 if isinstance(obj, bytes): 

223 return obj.decode('utf8', errors="ignore") 

224 return json.JSONEncoder.default(self, obj) 

225 

226 

227def group_by(resources, key): 

228 """Return a mapping of key value to resources with the corresponding value. 

229 

230 Key may be specified as dotted form for nested dictionary lookup 

231 """ 

232 resource_map = {} 

233 parts = key.split('.') 

234 for r in resources: 

235 v = r 

236 for k in parts: 

237 v = v.get(k) 

238 if not isinstance(v, dict): 

239 break 

240 resource_map.setdefault(v, []).append(r) 

241 return resource_map 

242 

243 

244def chunks(iterable, size=50): 

245 """Break an iterable into lists of size""" 

246 batch = [] 

247 for n in iterable: 

248 batch.append(n) 

249 if len(batch) % size == 0: 

250 yield batch 

251 batch = [] 

252 if batch: 

253 yield batch 

254 

255 

256def camelResource(obj, implicitDate=False, implicitTitle=True): 

257 """Some sources from apis return lowerCased where as describe calls 

258 

259 always return TitleCase, this function turns the former to the later 

260 

261 implicitDate ~ automatically sniff keys that look like isoformat date strings 

262 and convert to python datetime objects. 

263 """ 

264 if not isinstance(obj, dict): 

265 return obj 

266 for k in list(obj.keys()): 

267 v = obj.pop(k) 

268 if implicitTitle: 

269 ok = "%s%s" % (k[0].upper(), k[1:]) 

270 else: 

271 ok = k 

272 obj[ok] = v 

273 

274 if implicitDate: 

275 # config service handles datetime differently then describe sdks 

276 # the sdks use knowledge of the shape to support language native 

277 # date times, while config just turns everything into a serialized 

278 # json with mangled keys without type info. to normalize to describe 

279 # we implicitly sniff keys which look like datetimes, and have an 

280 # isoformat marker ('T'). 

281 kn = k.lower() 

282 if isinstance(v, (str, int)) and ('time' in kn or 'date' in kn): 

283 try: 

284 dv = parse_date(v) 

285 except ParserError: 

286 dv = None 

287 if dv: 

288 obj[ok] = dv 

289 if isinstance(v, dict): 

290 camelResource(v, implicitDate, implicitTitle) 

291 elif isinstance(v, list): 

292 for e in v: 

293 camelResource(e, implicitDate, implicitTitle) 

294 return obj 

295 

296 

297def get_account_id_from_sts(session): 

298 response = session.client('sts').get_caller_identity() 

299 return response.get('Account') 

300 

301 

302def get_account_alias_from_sts(session): 

303 response = session.client('iam').list_account_aliases() 

304 aliases = response.get('AccountAliases', ()) 

305 return aliases and aliases[0] or '' 

306 

307 

308def query_instances(session, client=None, **query): 

309 """Return a list of ec2 instances for the query. 

310 """ 

311 if client is None: 

312 client = session.client('ec2') 

313 p = client.get_paginator('describe_instances') 

314 results = p.paginate(**query) 

315 return list(itertools.chain( 

316 *[r["Instances"] for r in itertools.chain( 

317 *[pp['Reservations'] for pp in results])])) 

318 

319 

320CONN_CACHE = threading.local() 

321 

322 

323def local_session(factory, region=None): 

324 """Cache a session thread local for up to 45m""" 

325 factory_region = getattr(factory, 'region', 'global') 

326 if region: 

327 factory_region = region 

328 s = getattr(CONN_CACHE, factory_region, {}).get('session') 

329 t = getattr(CONN_CACHE, factory_region, {}).get('time') 

330 

331 n = time.time() 

332 if s is not None and t + (60 * 45) > n: 

333 return s 

334 s = factory() 

335 

336 setattr(CONN_CACHE, factory_region, {'session': s, 'time': n}) 

337 return s 

338 

339 

340def reset_session_cache(): 

341 for k in [k for k in dir(CONN_CACHE) if not k.startswith('_')]: 

342 setattr(CONN_CACHE, k, {}) 

343 

344 from .credentials import CustodianSession 

345 CustodianSession.close() 

346 

347 

348def annotation(i, k): 

349 return i.get(k, ()) 

350 

351 

352def set_annotation(i, k, v): 

353 """ 

354 >>> x = {} 

355 >>> set_annotation(x, 'marker', 'a') 

356 >>> annotation(x, 'marker') 

357 ['a'] 

358 """ 

359 if not isinstance(i, dict): 

360 raise ValueError("Can only annotate dictionaries") 

361 

362 if not isinstance(v, list): 

363 v = [v] 

364 

365 if k in i: 

366 ev = i.get(k) 

367 if isinstance(ev, list): 

368 ev.extend(v) 

369 else: 

370 i[k] = v 

371 

372 

373def parse_s3(s3_path): 

374 if not s3_path.startswith('s3://'): 

375 raise ValueError("invalid s3 path") 

376 ridx = s3_path.find('/', 5) 

377 if ridx == -1: 

378 ridx = None 

379 bucket = s3_path[5:ridx] 

380 s3_path = s3_path.rstrip('/') 

381 if ridx is None: 

382 key_prefix = "" 

383 else: 

384 key_prefix = s3_path[s3_path.find('/', 5):] 

385 return s3_path, bucket, key_prefix 

386 

387 

388REGION_PARTITION_MAP = { 

389 'us-gov-east-1': 'aws-us-gov', 

390 'us-gov-west-1': 'aws-us-gov', 

391 'cn-north-1': 'aws-cn', 

392 'cn-northwest-1': 'aws-cn', 

393 'us-isob-east-1': 'aws-iso-b', 

394 'us-iso-east-1': 'aws-iso' 

395} 

396 

397 

398def get_partition(region): 

399 return REGION_PARTITION_MAP.get(region, 'aws') 

400 

401 

402def generate_arn( 

403 service, resource, partition='aws', 

404 region=None, account_id=None, resource_type=None, separator='/'): 

405 """Generate an Amazon Resource Name. 

406 See http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html. 

407 """ 

408 if region and region in REGION_PARTITION_MAP: 

409 partition = REGION_PARTITION_MAP[region] 

410 if service == 's3': 

411 region = '' 

412 arn = 'arn:%s:%s:%s:%s:' % ( 

413 partition, service, region if region else '', account_id if account_id else '') 

414 if resource_type: 

415 if resource.startswith(separator): 

416 separator = '' 

417 arn = arn + '%s%s%s' % (resource_type, separator, resource) 

418 else: 

419 arn = arn + resource 

420 return arn 

421 

422 

423def snapshot_identifier(prefix, db_identifier): 

424 """Return an identifier for a snapshot of a database or cluster. 

425 """ 

426 now = datetime.now() 

427 return '%s-%s-%s' % (prefix, db_identifier, now.strftime('%Y-%m-%d-%H-%M')) 

428 

429 

430retry_log = logging.getLogger('c7n.retry') 

431 

432 

433def get_retry(retry_codes=(), max_attempts=8, min_delay=1, log_retries=False): 

434 """Decorator for retry boto3 api call on transient errors. 

435 

436 https://www.awsarchitectureblog.com/2015/03/backoff.html 

437 https://en.wikipedia.org/wiki/Exponential_backoff 

438 

439 :param codes: A sequence of retryable error codes. 

440 :param max_attempts: The max number of retries, by default the delay 

441 time is proportional to the max number of attempts. 

442 :param log_retries: Whether we should log retries, if specified 

443 specifies the level at which the retry should be logged. 

444 :param _max_delay: The maximum delay for any retry interval *note* 

445 this parameter is only exposed for unit testing, as its 

446 derived from the number of attempts. 

447 

448 Returns a function for invoking aws client calls that 

449 retries on retryable error codes. 

450 """ 

451 max_delay = max(min_delay, 2) ** max_attempts 

452 

453 def _retry(func, *args, ignore_err_codes=(), **kw): 

454 for idx, delay in enumerate( 

455 backoff_delays(min_delay, max_delay, jitter=True)): 

456 try: 

457 return func(*args, **kw) 

458 except ClientError as e: 

459 if e.response['Error']['Code'] in ignore_err_codes: 

460 return 

461 elif e.response['Error']['Code'] not in retry_codes: 

462 raise 

463 elif idx == max_attempts - 1: 

464 raise 

465 if log_retries: 

466 retry_log.log( 

467 log_retries, 

468 "retrying %s on error:%s attempt:%d last delay:%0.2f", 

469 func, e.response['Error']['Code'], idx, delay) 

470 time.sleep(delay) 

471 return _retry 

472 

473 

474def backoff_delays(start, stop, factor=2.0, jitter=False): 

475 """Geometric backoff sequence w/ jitter 

476 """ 

477 cur = start 

478 while cur <= stop: 

479 if jitter: 

480 yield cur - (cur * random.random() / 5) 

481 else: 

482 yield cur 

483 cur = cur * factor 

484 

485 

486def parse_cidr(value): 

487 """Process cidr ranges.""" 

488 if isinstance(value, list) or isinstance(value, set): 

489 return IPv4List([parse_cidr(item) for item in value]) 

490 klass = IPv4Network 

491 if '/' not in value: 

492 klass = ipaddress.ip_address 

493 try: 

494 v = klass(str(value)) 

495 except (ipaddress.AddressValueError, ValueError): 

496 v = None 

497 return v 

498 

499 

500class IPv4Network(ipaddress.IPv4Network): 

501 

502 # Override for net 2 net containment comparison 

503 def __contains__(self, other): 

504 if other is None: 

505 return False 

506 if isinstance(other, ipaddress._BaseNetwork): 

507 return self.supernet_of(other) 

508 return super(IPv4Network, self).__contains__(other) 

509 

510 if (sys.version_info.major == 3 and sys.version_info.minor <= 6): # pragma: no cover 

511 @staticmethod 

512 def _is_subnet_of(a, b): 

513 try: 

514 # Always false if one is v4 and the other is v6. 

515 if a._version != b._version: 

516 raise TypeError(f"{a} and {b} are not of the same version") 

517 return (b.network_address <= a.network_address and 

518 b.broadcast_address >= a.broadcast_address) 

519 except AttributeError: 

520 raise TypeError(f"Unable to test subnet containment " 

521 f"between {a} and {b}") 

522 

523 def supernet_of(self, other): 

524 """Return True if this network is a supernet of other.""" 

525 return self._is_subnet_of(other, self) 

526 

527 

528class IPv4List: 

529 def __init__(self, ipv4_list): 

530 self.ipv4_list = ipv4_list 

531 

532 def __contains__(self, other): 

533 if other is None: 

534 return False 

535 in_networks = any([other in y_elem for y_elem in self.ipv4_list 

536 if isinstance(y_elem, IPv4Network)]) 

537 in_addresses = any([other == y_elem for y_elem in self.ipv4_list 

538 if isinstance(y_elem, ipaddress.IPv4Address)]) 

539 return any([in_networks, in_addresses]) 

540 

541 

542def reformat_schema(model): 

543 """ Reformat schema to be in a more displayable format. """ 

544 if not hasattr(model, 'schema'): 

545 return "Model '{}' does not have a schema".format(model) 

546 

547 if 'properties' not in model.schema: 

548 return "Schema in unexpected format." 

549 

550 ret = copy.deepcopy(model.schema['properties']) 

551 

552 if 'type' in ret: 

553 del ret['type'] 

554 

555 for key in model.schema.get('required', []): 

556 if key in ret: 

557 ret[key]['required'] = True 

558 

559 return ret 

560 

561 

562# from botocore.utils avoiding runtime dependency for botocore for other providers. 

563# license apache 2.0 

564def set_value_from_jmespath(source, expression, value, is_first=True): 

565 # This takes a (limited) jmespath-like expression & can set a value based 

566 # on it. 

567 # Limitations: 

568 # * Only handles dotted lookups 

569 # * No offsets/wildcards/slices/etc. 

570 bits = expression.split('.', 1) 

571 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

572 

573 if not current_key: 

574 raise ValueError(expression) 

575 

576 if remainder: 

577 if current_key not in source: 

578 # We've got something in the expression that's not present in the 

579 # source (new key). If there's any more bits, we'll set the key 

580 # with an empty dictionary. 

581 source[current_key] = {} 

582 

583 return set_value_from_jmespath( 

584 source[current_key], 

585 remainder, 

586 value, 

587 is_first=False 

588 ) 

589 

590 # If we're down to a single key, set it. 

591 source[current_key] = value 

592 

593 

594def format_string_values(obj, err_fallback=(IndexError, KeyError), formatter=None, *args, **kwargs): 

595 """ 

596 Format all string values in an object. 

597 Return the updated object 

598 """ 

599 if isinstance(obj, dict): 

600 new = {} 

601 for key in obj.keys(): 

602 new[key] = format_string_values(obj[key], formatter=formatter, *args, **kwargs) 

603 return new 

604 elif isinstance(obj, list): 

605 new = [] 

606 for item in obj: 

607 new.append(format_string_values(item, formatter=formatter, *args, **kwargs)) 

608 return new 

609 elif isinstance(obj, str): 

610 try: 

611 if formatter: 

612 return formatter(obj, *args, **kwargs) 

613 else: 

614 return obj.format(*args, **kwargs) 

615 except err_fallback: 

616 return obj 

617 else: 

618 return obj 

619 

620 

621def parse_url_config(url): 

622 if url and '://' not in url: 

623 url += "://" 

624 conf = config.Bag() 

625 parsed = urlparse.urlparse(url) 

626 for k in ('scheme', 'netloc', 'path'): 

627 conf[k] = getattr(parsed, k) 

628 for k, v in urlparse.parse_qs(parsed.query).items(): 

629 conf[k] = v[0] 

630 conf['url'] = url 

631 return conf 

632 

633 

634def join_output_path(output_path, *parts): 

635 # allow users to specify interpolated output paths 

636 if '{' in output_path: 

637 return output_path 

638 

639 if "://" not in output_path: 

640 return os.path.join(output_path, *parts) 

641 

642 # handle urls with query strings 

643 parsed = urlparse.urlparse(output_path) 

644 updated_path = "/".join((parsed.path, *parts)) 

645 parts = list(parsed) 

646 parts[2] = updated_path 

647 return urlparse.urlunparse(parts) 

648 

649 

650def get_policy_provider(policy_data): 

651 if isinstance(policy_data['resource'], list): 

652 provider_name, _ = policy_data['resource'][0].split('.', 1) 

653 elif '.' in policy_data['resource']: 

654 provider_name, _ = policy_data['resource'].split('.', 1) 

655 else: 

656 provider_name = 'aws' 

657 return provider_name 

658 

659 

660def get_proxy_url(url): 

661 proxies = getproxies() 

662 parsed = urlparse.urlparse(url) 

663 

664 proxy_keys = [ 

665 parsed.scheme + '://' + parsed.netloc, 

666 parsed.scheme, 

667 'all://' + parsed.netloc, 

668 'all' 

669 ] 

670 

671 # Set port if not defined explicitly in url. 

672 port = parsed.port 

673 if port is None and parsed.scheme == 'http': 

674 port = 80 

675 elif port is None and parsed.scheme == 'https': 

676 port = 443 

677 

678 hostname = parsed.hostname is not None and parsed.hostname or '' 

679 

680 # Determine if proxy should be used based on no_proxy entries. 

681 # Note this does not support no_proxy ip or cidr entries. 

682 if proxy_bypass("%s:%s" % (hostname, port)): 

683 return None 

684 

685 for key in proxy_keys: 

686 if key in proxies: 

687 return proxies[key] 

688 

689 return None 

690 

691 

692class DeferredFormatString(UserString): 

693 """A string that returns itself when formatted 

694 

695 Let any format spec pass through. This lets us selectively defer 

696 expansion of runtime variables without losing format spec details. 

697 """ 

698 def __format__(self, format_spec): 

699 return "".join(("{", self.data, f":{format_spec}" if format_spec else "", "}")) 

700 

701 

702class FormatDate: 

703 """a datetime wrapper with extended pyformat syntax""" 

704 

705 date_increment = re.compile(r'\+[0-9]+[Mdh]') 

706 

707 def __init__(self, d=None): 

708 self._d = d 

709 

710 def __str__(self): 

711 return str(self._d) 

712 

713 @property 

714 def datetime(self): 

715 return self._d 

716 

717 @classmethod 

718 def utcnow(cls): 

719 return cls(datetime.utcnow()) 

720 

721 def __getattr__(self, k): 

722 return getattr(self._d, k) 

723 

724 def __format__(self, fmt=None): 

725 d = self._d 

726 increments = self.date_increment.findall(fmt) 

727 for i in increments: 

728 p = {} 

729 if i[-1] == 'M': 

730 p['minutes'] = float(i[1:-1]) 

731 if i[-1] == 'h': 

732 p['hours'] = float(i[1:-1]) 

733 if i[-1] == 'd': 

734 p['days'] = float(i[1:-1]) 

735 d = d + timedelta(**p) 

736 if increments: 

737 fmt = self.date_increment.sub("", fmt) 

738 return d.__format__(fmt) 

739 

740 

741class QueryParser: 

742 

743 QuerySchema = {} 

744 type_name = '' 

745 multi_value = True 

746 value_key = 'Values' 

747 

748 @classmethod 

749 def parse(cls, data): 

750 filters = [] 

751 if not isinstance(data, (tuple, list)): 

752 raise PolicyValidationError( 

753 "%s Query invalid format, must be array of dicts %s" % ( 

754 cls.type_name, 

755 data)) 

756 for d in data: 

757 if not isinstance(d, dict): 

758 raise PolicyValidationError( 

759 "%s Query Filter Invalid %s" % (cls.type_name, data)) 

760 if "Name" not in d or cls.value_key not in d: 

761 raise PolicyValidationError( 

762 "%s Query Filter Invalid: Missing Key or Values in %s" % ( 

763 cls.type_name, data)) 

764 

765 key = d['Name'] 

766 values = d[cls.value_key] 

767 

768 if not cls.multi_value and isinstance(values, list): 

769 raise PolicyValidationError( 

770 "%s Query Filter Invalid Key: Value:%s Must be single valued" % ( 

771 cls.type_name, key)) 

772 elif not cls.multi_value: 

773 values = [values] 

774 

775 if key not in cls.QuerySchema and not key.startswith('tag:'): 

776 raise PolicyValidationError( 

777 "%s Query Filter Invalid Key:%s Valid: %s" % ( 

778 cls.type_name, key, ", ".join(cls.QuerySchema.keys()))) 

779 

780 vtype = cls.QuerySchema.get(key) 

781 if vtype is None and key.startswith('tag'): 

782 vtype = str 

783 

784 if not isinstance(values, list): 

785 raise PolicyValidationError( 

786 "%s Query Filter Invalid Values, must be array %s" % ( 

787 cls.type_name, data,)) 

788 

789 for v in values: 

790 if isinstance(vtype, tuple): 

791 if v not in vtype: 

792 raise PolicyValidationError( 

793 "%s Query Filter Invalid Value: %s Valid: %s" % ( 

794 cls.type_name, v, ", ".join(vtype))) 

795 elif not isinstance(v, vtype): 

796 raise PolicyValidationError( 

797 "%s Query Filter Invalid Value Type %s" % ( 

798 cls.type_name, data,)) 

799 

800 filters.append(d) 

801 

802 return filters 

803 

804 

805def get_annotation_prefix(s): 

806 return 'c7n:{}'.format(s) 

807 

808 

809def merge_dict_list(dict_iter): 

810 """take an list of dictionaries and merge them. 

811 

812 last dict wins/overwrites on keys. 

813 """ 

814 result = {} 

815 for d in dict_iter: 

816 result.update(d) 

817 return result 

818 

819 

820def merge_dict(a, b): 

821 """Perform a merge of dictionaries A and B 

822 

823 Any subdictionaries will be recursively merged. 

824 Any leaf elements in the form of scalar will use the value from B. 

825 If A is a str and B is a list, A will be inserted into the front of the list. 

826 If A is a list and B is a str, B will be appended to the list. 

827 If there are two lists for the same key, the lists will be merged 

828 deduplicated with values in A first, followed by any additional values from B. 

829 """ 

830 d = copy.deepcopy(a) 

831 for k, v in b.items(): 

832 if k not in d: 

833 d[k] = v 

834 elif isinstance(d[k], dict) and isinstance(v, dict): 

835 d[k] = merge_dict(d[k], v) 

836 elif isinstance(d[k], list) and isinstance(v, list): 

837 for val in v: 

838 if val not in d[k]: 

839 d[k].append(val) 

840 elif isinstance(v, str) and isinstance(d[k], list): 

841 if v in d[k]: 

842 continue 

843 else: 

844 d[k].append(v) 

845 elif isinstance(v, list) and isinstance(d[k], str): 

846 if d[k] in v: 

847 d[k] = v 

848 else: 

849 d[k] = [d[k]] 

850 d[k].extend(v) 

851 elif k in d and isinstance(v, (int, str, float, bool)): 

852 d[k] = v 

853 else: 

854 raise Exception(f"k={k}, {type(v)} and {type(d[k])} not conformable.") 

855 return d 

856 

857 

858def compare_dicts_using_sets(a, b) -> bool: 

859 """Compares two dicts and replaces any lists or strings with sets 

860 

861 Compares any lists in the dict as sets. 

862 """ 

863 

864 if a.keys() != b.keys(): 

865 return False 

866 

867 for k, v in b.items(): 

868 if isinstance(v, list): 

869 v = format_to_set(v) 

870 if isinstance(a[k], str): 

871 a[k] = format_to_set(a[k]) 

872 if isinstance(a[k], list): 

873 a[k] = format_to_set(a[k]) 

874 if isinstance(v, str): 

875 v = format_to_set(v) 

876 if isinstance(a[k], dict) and isinstance(v, dict): 

877 if compare_dicts_using_sets(a[k], v): 

878 continue 

879 if v != a[k]: 

880 return False 

881 return True 

882 

883 

884def format_to_set(x) -> set: 

885 """Formats lists and strings to sets. 

886 

887 Strings return as a set with one string. 

888 Lists return as a set. 

889 Variables of other datatypes will return as the original datatype. 

890 """ 

891 if isinstance(x, str): 

892 return set([x]) 

893 if isinstance(x, list): 

894 return set(x) 

895 else: 

896 return x 

897 

898 

899def format_dict_with_sets(x: dict) -> dict: 

900 """Formats string and list values in a dict to sets. 

901 

902 Any string value returns as a set with one string. 

903 Any list values return as a set. 

904 Returns a formatted dict. 

905 """ 

906 if isinstance(x, dict): 

907 format_dict = {} 

908 for key, value in x.items(): 

909 if isinstance(value, dict): 

910 format_dict[key] = format_dict_with_sets(value) 

911 else: 

912 format_dict[key] = format_to_set(value) 

913 return format_dict 

914 else: 

915 return x 

916 

917 

918def select_keys(d, keys): 

919 result = {} 

920 for k in keys: 

921 result[k] = d.get(k) 

922 return result 

923 

924 

925def get_human_size(size, precision=2): 

926 # interesting discussion on 1024 vs 1000 as base 

927 # https://en.wikipedia.org/wiki/Binary_prefix 

928 suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] 

929 suffixIndex = 0 

930 while size > 1024: 

931 suffixIndex += 1 

932 size = size / 1024.0 

933 

934 return "%.*f %s" % (precision, size, suffixes[suffixIndex]) 

935 

936 

937def get_support_region(manager): 

938 # support is a unique service in that it doesnt support regional endpoints 

939 # thus, we need to construct the client based off the regions found here: 

940 # https://docs.aws.amazon.com/general/latest/gr/awssupport.html 

941 # 

942 # aws-cn uses cn-north-1 for both the Beijing and Ningxia regions 

943 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Beijing.html 

944 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Ningxia.html 

945 

946 partition = get_partition(manager.config.region) 

947 support_region = None 

948 if partition == "aws": 

949 support_region = "us-east-1" 

950 elif partition == "aws-us-gov": 

951 support_region = "us-gov-west-1" 

952 elif partition == "aws-cn": 

953 support_region = "cn-north-1" 

954 return support_region 

955 

956 

957def get_resource_tagging_region(resource_type, region): 

958 # For global resources, tags don't populate in the get_resources call 

959 # unless the call is being made to us-east-1. For govcloud this is us-gov-west-1. 

960 

961 partition = get_partition(region) 

962 if partition == "aws": 

963 return getattr(resource_type, 'global_resource', None) and 'us-east-1' or region 

964 elif partition == "aws-us-gov": 

965 return getattr(resource_type, 'global_resource', None) and 'us-gov-west-1' or region 

966 return region 

967 

968 

969def get_eni_resource_type(eni): 

970 if eni.get('Attachment'): 

971 instance_id = eni['Attachment'].get('InstanceId') 

972 else: 

973 instance_id = None 

974 description = eni.get('Description') 

975 # EC2 

976 if instance_id: 

977 rtype = 'ec2' 

978 # ELB/ELBv2 

979 elif description.startswith('ELB app/'): 

980 rtype = 'elb-app' 

981 elif description.startswith('ELB net/'): 

982 rtype = 'elb-net' 

983 elif description.startswith('ELB gwy/'): 

984 rtype = 'elb-gwy' 

985 elif description.startswith('ELB'): 

986 rtype = 'elb' 

987 # Other Resources 

988 elif description == 'ENI managed by APIGateway': 

989 rtype = 'apigw' 

990 elif description.startswith('AWS CodeStar Connections'): 

991 rtype = 'codestar' 

992 elif description.startswith('DAX'): 

993 rtype = 'dax' 

994 elif description.startswith('AWS created network interface for directory'): 

995 rtype = 'dir' 

996 elif description == 'DMSNetworkInterface': 

997 rtype = 'dms' 

998 elif description.startswith('arn:aws:ecs:'): 

999 rtype = 'ecs' 

1000 elif description.startswith('EFS mount target for'): 

1001 rtype = 'fsmt' 

1002 elif description.startswith('ElastiCache'): 

1003 rtype = 'elasticache' 

1004 elif description.startswith('AWS ElasticMapReduce'): 

1005 rtype = 'emr' 

1006 elif description.startswith('CloudHSM Managed Interface'): 

1007 rtype = 'hsm' 

1008 elif description.startswith('CloudHsm ENI'): 

1009 rtype = 'hsmv2' 

1010 elif description.startswith('AWS Lambda VPC ENI'): 

1011 rtype = 'lambda' 

1012 elif description.startswith('AWS Lambda VPC'): 

1013 rtype = 'lambda' 

1014 elif description.startswith('Interface for NAT Gateway'): 

1015 rtype = 'nat' 

1016 elif (description == 'RDSNetworkInterface' or 

1017 description.startswith('Network interface for DBProxy')): 

1018 rtype = 'rds' 

1019 elif description == 'RedshiftNetworkInterface': 

1020 rtype = 'redshift' 

1021 elif description.startswith('Network Interface for Transit Gateway Attachment'): 

1022 rtype = 'tgw' 

1023 elif description.startswith('VPC Endpoint Interface'): 

1024 rtype = 'vpce' 

1025 elif description.startswith('aws-k8s-branch-eni'): 

1026 rtype = 'eks' 

1027 else: 

1028 rtype = 'unknown' 

1029 return rtype 

1030 

1031 

1032class C7NJmespathFunctions(functions.Functions): 

1033 @functions.signature( 

1034 {'types': ['string']}, {'types': ['string']} 

1035 ) 

1036 def _func_split(self, sep, string): 

1037 return string.split(sep) 

1038 

1039 @functions.signature( 

1040 {'types': ['string']} 

1041 ) 

1042 def _func_from_json(self, string): 

1043 try: 

1044 return json.loads(string) 

1045 except json.JSONDecodeError: 

1046 return None 

1047 

1048 

1049class C7NJMESPathParser(Parser): 

1050 def parse(self, expression): 

1051 result = super().parse(expression) 

1052 return ParsedResultWithOptions( 

1053 expression=result.expression, 

1054 parsed=result.parsed 

1055 ) 

1056 

1057 

1058class ParsedResultWithOptions(ParsedResult): 

1059 def search(self, value, options=None): 

1060 # if options are explicitly passed in, we honor those 

1061 if not options: 

1062 options = jmespath.Options(custom_functions=C7NJmespathFunctions()) 

1063 return super().search(value, options) 

1064 

1065 

1066def jmespath_search(*args, **kwargs): 

1067 return jmespath.search( 

1068 *args, 

1069 **kwargs, 

1070 options=jmespath.Options(custom_functions=C7NJmespathFunctions()) 

1071 ) 

1072 

1073 

1074def get_path(path: str, resource: dict): 

1075 """ 

1076 This function provides a wrapper to obtain a value from a resource 

1077 in an efficient manner. 

1078 jmespath_search is expensive and it's rarely the case that 

1079 there is a path in the id field, therefore this wrapper is an optimisation. 

1080 

1081 :param path: the path or field name to fetch 

1082 :param resource: the resource instance description 

1083 :return: the field/path value 

1084 """ 

1085 if '.' in path: 

1086 return jmespath_search(path, resource) 

1087 return resource[path] 

1088 

1089 

1090def jmespath_compile(expression): 

1091 parsed = C7NJMESPathParser().parse(expression) 

1092 return parsed