Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/c7n/utils.py: 8%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

718 statements  

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import copy 

4from collections import UserString 

5from datetime import datetime, timedelta 

6from dateutil.tz import tzutc 

7import json 

8import itertools 

9import ipaddress 

10import logging 

11import os 

12import random 

13import re 

14import sys 

15import threading 

16import time 

17from urllib import parse as urlparse 

18from urllib.request import getproxies, proxy_bypass 

19 

20from dateutil.parser import ParserError, parse 

21 

22import importlib.resources 

23import jmespath 

24from jmespath import functions 

25from jmespath.parser import Parser, ParsedResult 

26 

27from c7n import config 

28from c7n.exceptions import ClientError, PolicyValidationError 

29 

30# Try to play nice in a serverless environment, where we don't require yaml 

31 

32try: 

33 import yaml 

34except ImportError: # pragma: no cover 

35 SafeLoader = BaseSafeDumper = yaml = None 

36else: 

37 try: 

38 from yaml import CSafeLoader as SafeLoader, CSafeDumper as BaseSafeDumper 

39 except ImportError: # pragma: no cover 

40 from yaml import SafeLoader, SafeDumper as BaseSafeDumper 

41 

42 

43class SafeDumper(BaseSafeDumper or object): 

44 def ignore_aliases(self, data): 

45 return True 

46 

47 

48log = logging.getLogger('custodian.utils') 

49 

50 

51class VarsSubstitutionError(Exception): 

52 pass 

53 

54 

55def load_file(path, format=None, vars=None): 

56 if format is None: 

57 format = 'yaml' 

58 _, ext = os.path.splitext(path) 

59 if ext[1:] == 'json': 

60 format = 'json' 

61 

62 with open(path) as fh: 

63 contents = fh.read() 

64 

65 if vars: 

66 try: 

67 contents = contents.format(**vars) 

68 except IndexError: 

69 msg = 'Failed to substitute variable by positional argument.' 

70 raise VarsSubstitutionError(msg) 

71 except KeyError as e: 

72 msg = 'Failed to substitute variables. KeyError on {}'.format(str(e)) 

73 raise VarsSubstitutionError(msg) 

74 

75 if format == 'yaml': 

76 return yaml_load(contents) 

77 elif format == 'json': 

78 return loads(contents) 

79 

80 

81def yaml_load(value): 

82 if yaml is None: 

83 raise RuntimeError("Yaml not available") 

84 return yaml.load(value, Loader=SafeLoader) 

85 

86 

87def yaml_dump(value): 

88 if yaml is None: 

89 raise RuntimeError("Yaml not available") 

90 return yaml.dump(value, default_flow_style=False, Dumper=SafeDumper) 

91 

92 

93def loads(body): 

94 return json.loads(body) 

95 

96 

97def dumps(data, fh=None, indent=0): 

98 if fh: 

99 return json.dump(data, fh, cls=JsonEncoder, indent=indent) 

100 else: 

101 return json.dumps(data, cls=JsonEncoder, indent=indent) 

102 

103 

104def format_event(evt): 

105 return json.dumps(evt, indent=2) 

106 

107 

108def filter_empty(d): 

109 for k, v in list(d.items()): 

110 if not v: 

111 del d[k] 

112 return d 

113 

114 

115# We need a minimum floor when examining possible timestamp 

116# values to distinguish from other numeric time usages. Use 

117# the S3 Launch Date. 

118DATE_FLOOR = time.mktime((2006, 3, 19, 0, 0, 0, 0, 0, 0)) 

119 

120 

121def parse_date(v, tz=None): 

122 """Handle various permutations of a datetime serialization 

123 to a datetime with the given timezone. 

124 

125 Handles strings, seconds since epoch, and milliseconds since epoch. 

126 """ 

127 

128 if v is None: 

129 return v 

130 

131 tz = tz or tzutc() 

132 

133 if isinstance(v, datetime): 

134 if v.tzinfo is None: 

135 return v.astimezone(tz) 

136 return v 

137 

138 if isinstance(v, str) and not v.isdigit(): 

139 try: 

140 return parse(v).astimezone(tz) 

141 except (AttributeError, TypeError, ValueError, OverflowError): 

142 pass 

143 

144 # OSError on windows -- https://bugs.python.org/issue36439 

145 exceptions = (ValueError, OSError) if os.name == "nt" else (ValueError) 

146 

147 if isinstance(v, (int, float, str)): 

148 try: 

149 if float(v) > DATE_FLOOR: 

150 v = datetime.fromtimestamp(float(v)).astimezone(tz) 

151 except exceptions: 

152 pass 

153 

154 if isinstance(v, (int, float, str)): 

155 # try interpreting as milliseconds epoch 

156 try: 

157 if float(v) > DATE_FLOOR: 

158 v = datetime.fromtimestamp(float(v) / 1000).astimezone(tz) 

159 except exceptions: 

160 pass 

161 

162 return isinstance(v, datetime) and v or None 

163 

164 

165def type_schema( 

166 type_name, inherits=None, rinherit=None, 

167 aliases=None, required=None, **props): 

168 """jsonschema generation helper 

169 

170 params: 

171 - type_name: name of the type 

172 - inherits: list of document fragments that are required via anyOf[$ref] 

173 - rinherit: use another schema as a base for this, basically work around 

174 inherits issues with additionalProperties and type enums. 

175 - aliases: additional names this type maybe called 

176 - required: list of required properties, by default 'type' is required 

177 - props: additional key value properties 

178 """ 

179 if aliases: 

180 type_names = [type_name] 

181 type_names.extend(aliases) 

182 else: 

183 type_names = [type_name] 

184 

185 if rinherit: 

186 s = copy.deepcopy(rinherit) 

187 s['properties']['type'] = {'enum': type_names} 

188 else: 

189 s = { 

190 'type': 'object', 

191 'properties': { 

192 'type': {'enum': type_names}}} 

193 

194 # Ref based inheritance and additional properties don't mix well. 

195 # https://stackoverflow.com/questions/22689900/json-schema-allof-with-additionalproperties 

196 if not inherits: 

197 s['additionalProperties'] = False 

198 

199 s['properties'].update(props) 

200 

201 for k, v in props.items(): 

202 if v is None: 

203 del s['properties'][k] 

204 if not required: 

205 required = [] 

206 if isinstance(required, list): 

207 required.append('type') 

208 s['required'] = required 

209 if inherits: 

210 extended = s 

211 s = {'allOf': [{'$ref': i} for i in inherits]} 

212 s['allOf'].append(extended) 

213 return s 

214 

215 

216class JsonEncoder(json.JSONEncoder): 

217 

218 def default(self, obj): 

219 if isinstance(obj, datetime): 

220 return obj.isoformat() 

221 if isinstance(obj, FormatDate): 

222 return obj.datetime.isoformat() 

223 if isinstance(obj, bytes): 

224 return obj.decode('utf8', errors="ignore") 

225 return json.JSONEncoder.default(self, obj) 

226 

227 

228def group_by(resources, key): 

229 """Return a mapping of key value to resources with the corresponding value. 

230 

231 Key may be specified as dotted form for nested dictionary lookup 

232 """ 

233 resource_map = {} 

234 parts = key.split('.') 

235 for r in resources: 

236 v = r 

237 for k in parts: 

238 v = v.get(k) 

239 if not isinstance(v, dict): 

240 break 

241 resource_map.setdefault(v, []).append(r) 

242 return resource_map 

243 

244 

245def chunks(iterable, size=50): 

246 """Break an iterable into lists of size""" 

247 batch = [] 

248 for n in iterable: 

249 batch.append(n) 

250 if len(batch) % size == 0: 

251 yield batch 

252 batch = [] 

253 if batch: 

254 yield batch 

255 

256 

257def camelResource(obj, implicitDate=False, implicitTitle=True): 

258 """Some sources from apis return lowerCased where as describe calls 

259 

260 always return TitleCase, this function turns the former to the later 

261 

262 implicitDate ~ automatically sniff keys that look like isoformat date strings 

263 and convert to python datetime objects. 

264 """ 

265 if not isinstance(obj, dict): 

266 return obj 

267 for k in list(obj.keys()): 

268 v = obj.pop(k) 

269 if implicitTitle: 

270 ok = "%s%s" % (k[0].upper(), k[1:]) 

271 else: 

272 ok = k 

273 obj[ok] = v 

274 

275 if implicitDate: 

276 # config service handles datetime differently then describe sdks 

277 # the sdks use knowledge of the shape to support language native 

278 # date times, while config just turns everything into a serialized 

279 # json with mangled keys without type info. to normalize to describe 

280 # we implicitly sniff keys which look like datetimes, and have an 

281 # isoformat marker ('T'). 

282 kn = k.lower() 

283 if isinstance(v, (str, int)) and ('time' in kn or 'date' in kn): 

284 try: 

285 dv = parse_date(v) 

286 except ParserError: 

287 dv = None 

288 if dv: 

289 obj[ok] = dv 

290 if isinstance(v, dict): 

291 camelResource(v, implicitDate, implicitTitle) 

292 elif isinstance(v, list): 

293 for e in v: 

294 camelResource(e, implicitDate, implicitTitle) 

295 return obj 

296 

297 

298def get_account_id_from_sts(session): 

299 response = session.client('sts').get_caller_identity() 

300 return response.get('Account') 

301 

302 

303def get_account_alias_from_sts(session): 

304 response = session.client('iam').list_account_aliases() 

305 aliases = response.get('AccountAliases', ()) 

306 return aliases and aliases[0] or '' 

307 

308 

309def query_instances(session, client=None, **query): 

310 """Return a list of ec2 instances for the query. 

311 """ 

312 if client is None: 

313 client = session.client('ec2') 

314 p = client.get_paginator('describe_instances') 

315 results = p.paginate(**query) 

316 return list(itertools.chain( 

317 *[r["Instances"] for r in itertools.chain( 

318 *[pp['Reservations'] for pp in results])])) 

319 

320 

321CONN_CACHE = threading.local() 

322 

323 

324def local_session(factory, region=None): 

325 """Cache a session thread local for up to 45m""" 

326 factory_region = getattr(factory, 'region', 'global') 

327 if region: 

328 factory_region = region 

329 s = getattr(CONN_CACHE, factory_region, {}).get('session') 

330 t = getattr(CONN_CACHE, factory_region, {}).get('time') 

331 

332 n = time.time() 

333 if s is not None and t + (60 * 45) > n: 

334 return s 

335 s = factory() 

336 

337 setattr(CONN_CACHE, factory_region, {'session': s, 'time': n}) 

338 return s 

339 

340 

341def reset_session_cache(): 

342 for k in [k for k in dir(CONN_CACHE) if not k.startswith('_')]: 

343 setattr(CONN_CACHE, k, {}) 

344 

345 from .credentials import CustodianSession 

346 CustodianSession.close() 

347 

348 

349def annotation(i, k): 

350 return i.get(k, ()) 

351 

352 

353def set_annotation(i, k, v): 

354 """ 

355 >>> x = {} 

356 >>> set_annotation(x, 'marker', 'a') 

357 >>> annotation(x, 'marker') 

358 ['a'] 

359 """ 

360 if not isinstance(i, dict): 

361 raise ValueError("Can only annotate dictionaries") 

362 

363 if not isinstance(v, list): 

364 v = [v] 

365 

366 if k in i: 

367 ev = i.get(k) 

368 if isinstance(ev, list): 

369 ev.extend(v) 

370 else: 

371 i[k] = v 

372 

373 

374def parse_s3(s3_path): 

375 if not s3_path.startswith('s3://'): 

376 raise ValueError("invalid s3 path") 

377 ridx = s3_path.find('/', 5) 

378 if ridx == -1: 

379 ridx = None 

380 bucket = s3_path[5:ridx] 

381 s3_path = s3_path.rstrip('/') 

382 if ridx is None: 

383 key_prefix = "" 

384 else: 

385 key_prefix = s3_path[s3_path.find('/', 5):] 

386 return s3_path, bucket, key_prefix 

387 

388 

389REGION_PARTITION_MAP = json.loads( 

390 (importlib.resources.files('c7n') / 'data/aws_region_partition_map.json').read_text() 

391) 

392 

393 

394def get_partition(region): 

395 if not region: 

396 return '' 

397 return REGION_PARTITION_MAP.get(region, 'aws') 

398 

399 

400def generate_arn( 

401 service, resource, partition='aws', 

402 region=None, account_id=None, resource_type=None, separator='/'): 

403 """Generate an Amazon Resource Name. 

404 See http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html. 

405 """ 

406 if region: 

407 partition = get_partition(region) 

408 if service == 's3': 

409 region = '' 

410 arn = 'arn:%s:%s:%s:%s:' % ( 

411 partition, service, region if region else '', account_id if account_id else '') 

412 if resource_type: 

413 if resource.startswith(separator): 

414 separator = '' 

415 arn = arn + '%s%s%s' % (resource_type, separator, resource) 

416 else: 

417 arn = arn + resource 

418 return arn 

419 

420 

421def snapshot_identifier(prefix, db_identifier): 

422 """Return an identifier for a snapshot of a database or cluster. 

423 """ 

424 now = datetime.now() 

425 return '%s-%s-%s' % (prefix, db_identifier, now.strftime('%Y-%m-%d-%H-%M')) 

426 

427 

428retry_log = logging.getLogger('c7n.retry') 

429 

430 

431def get_retry(retry_codes=(), max_attempts=8, min_delay=1, log_retries=False): 

432 """Decorator for retry boto3 api call on transient errors. 

433 

434 https://www.awsarchitectureblog.com/2015/03/backoff.html 

435 https://en.wikipedia.org/wiki/Exponential_backoff 

436 

437 :param codes: A sequence of retryable error codes. 

438 :param max_attempts: The max number of retries, by default the delay 

439 time is proportional to the max number of attempts. 

440 :param log_retries: Whether we should log retries, if specified 

441 specifies the level at which the retry should be logged. 

442 :param _max_delay: The maximum delay for any retry interval *note* 

443 this parameter is only exposed for unit testing, as its 

444 derived from the number of attempts. 

445 

446 Returns a function for invoking aws client calls that 

447 retries on retryable error codes. 

448 """ 

449 max_delay = max(min_delay, 2) ** max_attempts 

450 

451 def _retry(func, *args, ignore_err_codes=(), **kw): 

452 for idx, delay in enumerate( 

453 backoff_delays(min_delay, max_delay, jitter=True)): 

454 try: 

455 return func(*args, **kw) 

456 except ClientError as e: 

457 if e.response['Error']['Code'] in ignore_err_codes: 

458 return 

459 elif e.response['Error']['Code'] not in retry_codes: 

460 raise 

461 elif idx == max_attempts - 1: 

462 raise 

463 if log_retries: 

464 retry_log.log( 

465 log_retries, 

466 "retrying %s on error:%s attempt:%d last delay:%0.2f", 

467 func, e.response['Error']['Code'], idx, delay) 

468 time.sleep(delay) 

469 return _retry 

470 

471 

472def backoff_delays(start, stop, factor=2.0, jitter=False): 

473 """Geometric backoff sequence w/ jitter 

474 """ 

475 cur = start 

476 while cur <= stop: 

477 if jitter: 

478 yield cur - (cur * random.random() / 5) 

479 else: 

480 yield cur 

481 cur = cur * factor 

482 

483 

484def parse_cidr(value): 

485 """Process cidr ranges.""" 

486 if isinstance(value, list) or isinstance(value, set): 

487 return IPv4List([parse_cidr(item) for item in value]) 

488 klass = IPv4Network 

489 if '/' not in value: 

490 klass = ipaddress.ip_address 

491 try: 

492 v = klass(str(value)) 

493 except (ipaddress.AddressValueError, ValueError): 

494 v = None 

495 return v 

496 

497 

498class IPv4Network(ipaddress.IPv4Network): 

499 

500 # Override for net 2 net containment comparison 

501 def __contains__(self, other): 

502 if other is None: 

503 return False 

504 if isinstance(other, ipaddress._BaseNetwork): 

505 return self.supernet_of(other) 

506 return super(IPv4Network, self).__contains__(other) 

507 

508 if (sys.version_info.major == 3 and sys.version_info.minor <= 6): # pragma: no cover 

509 @staticmethod 

510 def _is_subnet_of(a, b): 

511 try: 

512 # Always false if one is v4 and the other is v6. 

513 if a._version != b._version: 

514 raise TypeError(f"{a} and {b} are not of the same version") 

515 return (b.network_address <= a.network_address and 

516 b.broadcast_address >= a.broadcast_address) 

517 except AttributeError: 

518 raise TypeError(f"Unable to test subnet containment " 

519 f"between {a} and {b}") 

520 

521 def supernet_of(self, other): 

522 """Return True if this network is a supernet of other.""" 

523 return self._is_subnet_of(other, self) 

524 

525 

526class IPv4List: 

527 def __init__(self, ipv4_list): 

528 self.ipv4_list = ipv4_list 

529 

530 def __contains__(self, other): 

531 if other is None: 

532 return False 

533 in_networks = any([other in y_elem for y_elem in self.ipv4_list 

534 if isinstance(y_elem, IPv4Network)]) 

535 in_addresses = any([other == y_elem for y_elem in self.ipv4_list 

536 if isinstance(y_elem, ipaddress.IPv4Address)]) 

537 return any([in_networks, in_addresses]) 

538 

539 

540def reformat_schema(model): 

541 """ Reformat schema to be in a more displayable format. """ 

542 if not hasattr(model, 'schema'): 

543 return "Model '{}' does not have a schema".format(model) 

544 

545 if 'properties' not in model.schema: 

546 return "Schema in unexpected format." 

547 

548 ret = copy.deepcopy(model.schema['properties']) 

549 

550 if 'type' in ret: 

551 del ret['type'] 

552 

553 for key in model.schema.get('required', []): 

554 if key in ret: 

555 ret[key]['required'] = True 

556 

557 return ret 

558 

559 

560# from botocore.utils avoiding runtime dependency for botocore for other providers. 

561# license apache 2.0 

562def set_value_from_jmespath(source, expression, value, is_first=True): 

563 # This takes a (limited) jmespath-like expression & can set a value based 

564 # on it. 

565 # Limitations: 

566 # * Only handles dotted lookups 

567 # * No offsets/wildcards/slices/etc. 

568 bits = expression.split('.', 1) 

569 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

570 

571 if not current_key: 

572 raise ValueError(expression) 

573 

574 if remainder: 

575 if current_key not in source: 

576 # We've got something in the expression that's not present in the 

577 # source (new key). If there's any more bits, we'll set the key 

578 # with an empty dictionary. 

579 source[current_key] = {} 

580 

581 return set_value_from_jmespath( 

582 source[current_key], 

583 remainder, 

584 value, 

585 is_first=False 

586 ) 

587 

588 # If we're down to a single key, set it. 

589 source[current_key] = value 

590 

591 

592def format_string_values(obj, err_fallback=(IndexError, KeyError), formatter=None, *args, **kwargs): 

593 """ 

594 Format all string values in an object. 

595 Return the updated object 

596 """ 

597 if isinstance(obj, dict): 

598 new = {} 

599 for key in obj.keys(): 

600 new[key] = format_string_values(obj[key], formatter=formatter, *args, **kwargs) 

601 return new 

602 elif isinstance(obj, list): 

603 new = [] 

604 for item in obj: 

605 new.append(format_string_values(item, formatter=formatter, *args, **kwargs)) 

606 return new 

607 elif isinstance(obj, str): 

608 try: 

609 if formatter: 

610 return formatter(obj, *args, **kwargs) 

611 else: 

612 return obj.format(*args, **kwargs) 

613 except err_fallback: 

614 return obj 

615 else: 

616 return obj 

617 

618 

619def parse_url_config(url): 

620 if url and '://' not in url: 

621 url += "://" 

622 conf = config.Bag() 

623 parsed = urlparse.urlparse(url) 

624 for k in ('scheme', 'netloc', 'path'): 

625 conf[k] = getattr(parsed, k) 

626 for k, v in urlparse.parse_qs(parsed.query).items(): 

627 conf[k] = v[0] 

628 conf['url'] = url 

629 return conf 

630 

631 

632def join_output_path(output_path, *parts): 

633 # allow users to specify interpolated output paths 

634 if '{' in output_path: 

635 return output_path 

636 

637 if "://" not in output_path: 

638 return os.path.join(output_path, *parts) 

639 

640 # handle urls with query strings 

641 parsed = urlparse.urlparse(output_path) 

642 updated_path = "/".join((parsed.path, *parts)) 

643 parts = list(parsed) 

644 parts[2] = updated_path 

645 return urlparse.urlunparse(parts) 

646 

647 

648def get_policy_provider(policy_data): 

649 if isinstance(policy_data['resource'], list): 

650 provider_name, _ = policy_data['resource'][0].split('.', 1) 

651 elif '.' in policy_data['resource']: 

652 provider_name, _ = policy_data['resource'].split('.', 1) 

653 else: 

654 provider_name = 'aws' 

655 return provider_name 

656 

657 

658def get_proxy_url(url): 

659 proxies = getproxies() 

660 parsed = urlparse.urlparse(url) 

661 

662 proxy_keys = [ 

663 parsed.scheme + '://' + parsed.netloc, 

664 parsed.scheme, 

665 'all://' + parsed.netloc, 

666 'all' 

667 ] 

668 

669 # Set port if not defined explicitly in url. 

670 port = parsed.port 

671 if port is None and parsed.scheme == 'http': 

672 port = 80 

673 elif port is None and parsed.scheme == 'https': 

674 port = 443 

675 

676 hostname = parsed.hostname is not None and parsed.hostname or '' 

677 

678 # Determine if proxy should be used based on no_proxy entries. 

679 # Note this does not support no_proxy ip or cidr entries. 

680 if proxy_bypass("%s:%s" % (hostname, port)): 

681 return None 

682 

683 for key in proxy_keys: 

684 if key in proxies: 

685 return proxies[key] 

686 

687 return None 

688 

689 

690class DeferredFormatString(UserString): 

691 """A string that returns itself when formatted 

692 

693 Let any format spec pass through. This lets us selectively defer 

694 expansion of runtime variables without losing format spec details. 

695 """ 

696 def __format__(self, format_spec): 

697 return "".join(("{", self.data, f":{format_spec}" if format_spec else "", "}")) 

698 

699 

700class FormatDate: 

701 """a datetime wrapper with extended pyformat syntax""" 

702 

703 date_increment = re.compile(r'\+[0-9]+[Mdh]') 

704 

705 def __init__(self, d=None): 

706 self._d = d 

707 

708 def __str__(self): 

709 return str(self._d) 

710 

711 @property 

712 def datetime(self): 

713 return self._d 

714 

715 @classmethod 

716 def utcnow(cls): 

717 return cls(datetime.utcnow()) 

718 

719 def __getattr__(self, k): 

720 return getattr(self._d, k) 

721 

722 def __format__(self, fmt=None): 

723 d = self._d 

724 increments = self.date_increment.findall(fmt) 

725 for i in increments: 

726 p = {} 

727 if i[-1] == 'M': 

728 p['minutes'] = float(i[1:-1]) 

729 if i[-1] == 'h': 

730 p['hours'] = float(i[1:-1]) 

731 if i[-1] == 'd': 

732 p['days'] = float(i[1:-1]) 

733 d = d + timedelta(**p) 

734 if increments: 

735 fmt = self.date_increment.sub("", fmt) 

736 return d.__format__(fmt) 

737 

738 

739class QueryParser: 

740 

741 QuerySchema = {} 

742 type_name = '' 

743 # Allow multiple values to be passed to a query param 

744 multi_value = True 

745 # If using multi_value, specify scalar fields here 

746 single_value_fields = () 

747 

748 @classmethod 

749 def is_implicit_query_filter(cls, data): 

750 key = list(data[0].keys())[0] 

751 if (key not in cls.QuerySchema and 'Filters' in cls.QuerySchema and 

752 (key in cls.QuerySchema['Filters'] or key.startswith('tag:'))): 

753 return True 

754 return False 

755 

756 @classmethod 

757 def implicit_qfilter_translate(cls, data): 

758 filters = [] 

759 for d in data: 

760 key = list(d.keys())[0] 

761 values = list(d.values())[0] 

762 if not isinstance(values, list): 

763 values = [values] 

764 filters.append({'Name': key, 'Values': values}) 

765 return [{'Filters': filters}] 

766 

767 @classmethod 

768 def parse(cls, data): 

769 if not isinstance(data, (tuple, list)): 

770 raise PolicyValidationError( 

771 f"{cls.type_name} Query Invalid Format, must be array of dicts" 

772 ) 

773 

774 # Backwards compatibility 

775 if data: 

776 if not isinstance(data[0], dict): 

777 raise PolicyValidationError( 

778 f"{cls.type_name} Query Invalid Format, must be array of dicts" 

779 ) 

780 # Check for query filter key value pairs not listed under 'Filters' key 

781 if cls.is_implicit_query_filter(data): 

782 data = cls.implicit_qfilter_translate(data) 

783 

784 # Support iam-policy and elasticache 'Name', 'Value' queries without 'Filters' key 

785 if (data[0].get('Value') and 

786 (cls.type_name == 'IAM Policy' or cls.type_name == 'ElastiCache')): 

787 try: 

788 data = [{d['Name']: d['Value']} for d in data] 

789 except KeyError: 

790 raise PolicyValidationError( 

791 f"{cls.type_name} Query Invalid Format. " 

792 f"Query: {data} is not a list of key-value pairs " 

793 f"from {cls.QuerySchema}" 

794 ) 

795 

796 # Support ebs-snapshot and volume 'Name', 'Values' queries without 'Filters' key 

797 elif (data[0].get('Values') and 

798 (cls.type_name == 'EBS Snapshot' or 

799 cls.type_name == 'EBS Volume')): 

800 data = [{"Filters": data}] 

801 

802 results = [] 

803 names = set() 

804 for d in data: 

805 if not isinstance(d, dict): 

806 raise PolicyValidationError( 

807 f"Query Invalid Format. Must be a list of key-value pairs " 

808 f"from {cls.QuerySchema}" 

809 ) 

810 if not len(list(d.keys())) == 1: 

811 raise PolicyValidationError( 

812 f"Query Invalid Format. Must be a list of key-value pairs " 

813 f"from {cls.QuerySchema}" 

814 ) 

815 

816 if d.get("Filters"): 

817 results.append({"Filters": cls.parse_qfilters(d["Filters"])}) 

818 else: 

819 key, value = cls.parse_query(d) 

820 

821 # Allow for multiple queries with the same key 

822 if key in names and (not cls.multi_value or key in cls.single_value_fields): 

823 raise PolicyValidationError( 

824 f"{cls.type_name} Query Invalid Key: {key} Must be unique") 

825 elif key in names: 

826 for q in results: 

827 if list(q.keys())[0] == key: 

828 q[key].append(d[key]) 

829 else: 

830 names.add(key) 

831 results.append({key: value}) 

832 

833 return results 

834 

835 @classmethod 

836 def parse_qfilters(cls, data): 

837 if not isinstance(data, (tuple, list)): 

838 raise PolicyValidationError( 

839 f"{cls.type_name} Query Filter Invalid Format, must be array of dicts" 

840 ) 

841 

842 results = [] 

843 names = set() 

844 for f in data: 

845 if not isinstance(f, dict): 

846 raise PolicyValidationError( 

847 f"{cls.type_name} Query Filter Invalid Format, must be array of dicts" 

848 ) 

849 if "Name" not in f or "Values" not in f: 

850 raise PolicyValidationError( 

851 f"{cls.type_name} Query Filter Invalid: Each filter must " 

852 "contain 'Name' and 'Values' keys." 

853 ) 

854 

855 key = f['Name'] 

856 values = f['Values'] 

857 

858 if key not in cls.QuerySchema.get("Filters", {}) and not key.startswith('tag:'): 

859 raise PolicyValidationError( 

860 f"{cls.type_name} Query Filter Invalid Key: {key} " 

861 f"Valid: {', '.join(cls.QuerySchema.keys())}" 

862 ) 

863 

864 if not isinstance(values, list): 

865 raise PolicyValidationError( 

866 f"{cls.type_name} Query Filter Invalid Value {f} for key {key}, must be array.") 

867 

868 vtype = cls.QuerySchema["Filters"].get(key) 

869 if vtype is None and key.startswith('tag'): 

870 vtype = str 

871 

872 for v in values: 

873 cls.type_check(vtype, v) 

874 

875 # Allow for multiple queries with the same key 

876 if key in names: 

877 for qf in results: 

878 if qf['Name'] == key: 

879 qf['Values'].extend(values) 

880 else: 

881 names.add(key) 

882 results.append({'Name': key, 'Values': values}) 

883 

884 return results 

885 

886 @classmethod 

887 def parse_query(cls, data): 

888 key = list(data.keys())[0] 

889 values = list(data.values())[0] 

890 

891 if (not cls.multi_value or key in cls.single_value_fields) and isinstance(values, list): 

892 raise PolicyValidationError( 

893 f"{cls.type_name} Query Invalid Value {values}: Value for {key} must be scalar" 

894 ) 

895 elif (cls.multi_value and key not in cls.single_value_fields 

896 and not isinstance(values, list)): 

897 values = [values] 

898 

899 if key not in cls.QuerySchema: 

900 raise PolicyValidationError( 

901 f"{cls.type_name} Query Invalid Key: {key} " 

902 f"Valid: {', '.join(cls.QuerySchema.keys())}" 

903 ) 

904 

905 vtype = cls.QuerySchema.get(key) 

906 if isinstance(values, list): 

907 for v in values: 

908 cls.type_check(vtype, v) 

909 else: 

910 cls.type_check(vtype, values) 

911 

912 return key, values 

913 

914 @classmethod 

915 def type_check(cls, vtype, value): 

916 if isinstance(vtype, tuple): 

917 if value not in vtype: 

918 raise PolicyValidationError( 

919 f"{cls.type_name} Query Invalid Value: {value} Valid: {', '.join(vtype)}") 

920 elif vtype == 'date': 

921 if not parse_date(value): 

922 raise PolicyValidationError( 

923 f"{cls.type_name} Query Invalid Date Value: {value}") 

924 elif not isinstance(value, vtype): 

925 raise PolicyValidationError( 

926 f"{cls.type_name} Query Invalid Value Type {value}" 

927 ) 

928 

929 

930def get_annotation_prefix(s): 

931 return 'c7n:{}'.format(s) 

932 

933 

934def merge_dict_list(dict_iter): 

935 """take an list of dictionaries and merge them. 

936 

937 last dict wins/overwrites on keys. 

938 """ 

939 result = {} 

940 for d in dict_iter: 

941 result.update(d) 

942 return result 

943 

944 

945def merge_dict(a, b): 

946 """Perform a merge of dictionaries A and B 

947 

948 Any subdictionaries will be recursively merged. 

949 Any leaf elements in the form of scalar will use the value from B. 

950 If A is a str and B is a list, A will be inserted into the front of the list. 

951 If A is a list and B is a str, B will be appended to the list. 

952 If there are two lists for the same key, the lists will be merged 

953 deduplicated with values in A first, followed by any additional values from B. 

954 """ 

955 d = copy.deepcopy(a) 

956 for k, v in b.items(): 

957 if k not in d: 

958 d[k] = v 

959 elif isinstance(d[k], dict) and isinstance(v, dict): 

960 d[k] = merge_dict(d[k], v) 

961 elif isinstance(d[k], list) and isinstance(v, list): 

962 for val in v: 

963 if val not in d[k]: 

964 d[k].append(val) 

965 elif isinstance(v, str) and isinstance(d[k], list): 

966 if v in d[k]: 

967 continue 

968 else: 

969 d[k].append(v) 

970 elif isinstance(v, list) and isinstance(d[k], str): 

971 if d[k] in v: 

972 d[k] = v 

973 else: 

974 d[k] = [d[k]] 

975 d[k].extend(v) 

976 elif k in d and isinstance(v, (int, str, float, bool)): 

977 d[k] = v 

978 else: 

979 raise Exception(f"k={k}, {type(v)} and {type(d[k])} not conformable.") 

980 return d 

981 

982 

983def compare_dicts_using_sets(a, b) -> bool: 

984 """Compares two dicts and replaces any lists or strings with sets 

985 

986 Compares any lists in the dict as sets. 

987 """ 

988 

989 if a.keys() != b.keys(): 

990 return False 

991 

992 for k, v in b.items(): 

993 if isinstance(v, list): 

994 v = format_to_set(v) 

995 if isinstance(a[k], str): 

996 a[k] = format_to_set(a[k]) 

997 if isinstance(a[k], list): 

998 a[k] = format_to_set(a[k]) 

999 if isinstance(v, str): 

1000 v = format_to_set(v) 

1001 if isinstance(a[k], dict) and isinstance(v, dict): 

1002 if compare_dicts_using_sets(a[k], v): 

1003 continue 

1004 if v != a[k]: 

1005 return False 

1006 return True 

1007 

1008 

1009def format_to_set(x) -> set: 

1010 """Formats lists and strings to sets. 

1011 

1012 Strings return as a set with one string. 

1013 Lists return as a set. 

1014 Variables of other datatypes will return as the original datatype. 

1015 """ 

1016 if isinstance(x, str): 

1017 return set([x]) 

1018 if isinstance(x, list): 

1019 return set(x) 

1020 else: 

1021 return x 

1022 

1023 

1024def format_dict_with_sets(x: dict) -> dict: 

1025 """Formats string and list values in a dict to sets. 

1026 

1027 Any string value returns as a set with one string. 

1028 Any list values return as a set. 

1029 Returns a formatted dict. 

1030 """ 

1031 if isinstance(x, dict): 

1032 format_dict = {} 

1033 for key, value in x.items(): 

1034 if isinstance(value, dict): 

1035 format_dict[key] = format_dict_with_sets(value) 

1036 else: 

1037 format_dict[key] = format_to_set(value) 

1038 return format_dict 

1039 else: 

1040 return x 

1041 

1042 

1043def select_keys(d, keys): 

1044 result = {} 

1045 for k in keys: 

1046 result[k] = d.get(k) 

1047 return result 

1048 

1049 

1050def get_human_size(size, precision=2): 

1051 # interesting discussion on 1024 vs 1000 as base 

1052 # https://en.wikipedia.org/wiki/Binary_prefix 

1053 suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] 

1054 suffixIndex = 0 

1055 while size > 1024: 

1056 suffixIndex += 1 

1057 size = size / 1024.0 

1058 

1059 return "%.*f %s" % (precision, size, suffixes[suffixIndex]) 

1060 

1061 

1062def get_support_region(manager): 

1063 # support is a unique service in that it doesnt support regional endpoints 

1064 # thus, we need to construct the client based off the regions found here: 

1065 # https://docs.aws.amazon.com/general/latest/gr/awssupport.html 

1066 # 

1067 # aws-cn uses cn-north-1 for both the Beijing and Ningxia regions 

1068 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Beijing.html 

1069 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Ningxia.html 

1070 

1071 partition = get_partition(manager.config.region) 

1072 support_region = None 

1073 if partition == "aws": 

1074 support_region = "us-east-1" 

1075 elif partition == "aws-us-gov": 

1076 support_region = "us-gov-west-1" 

1077 elif partition == "aws-cn": 

1078 support_region = "cn-north-1" 

1079 return support_region 

1080 

1081 

1082def get_resource_tagging_region(resource_type, region): 

1083 # For global resources, tags don't populate in the get_resources call 

1084 # unless the call is being made to us-east-1. For govcloud this is us-gov-west-1. 

1085 

1086 partition = get_partition(region) 

1087 if partition == "aws": 

1088 return getattr(resource_type, 'global_resource', None) and 'us-east-1' or region 

1089 elif partition == "aws-us-gov": 

1090 return getattr(resource_type, 'global_resource', None) and 'us-gov-west-1' or region 

1091 return region 

1092 

1093 

1094def get_eni_resource_type(eni): 

1095 if eni.get('Attachment'): 

1096 instance_id = eni['Attachment'].get('InstanceId') 

1097 else: 

1098 instance_id = None 

1099 description = eni.get('Description') 

1100 # EC2 

1101 if instance_id: 

1102 rtype = 'ec2' 

1103 # ELB/ELBv2 

1104 elif description.startswith('ELB app/'): 

1105 rtype = 'elb-app' 

1106 elif description.startswith('ELB net/'): 

1107 rtype = 'elb-net' 

1108 elif description.startswith('ELB gwy/'): 

1109 rtype = 'elb-gwy' 

1110 elif description.startswith('ELB'): 

1111 rtype = 'elb' 

1112 # Other Resources 

1113 elif description == 'ENI managed by APIGateway': 

1114 rtype = 'apigw' 

1115 elif description.startswith('AWS CodeStar Connections'): 

1116 rtype = 'codestar' 

1117 elif description.startswith('DAX'): 

1118 rtype = 'dax' 

1119 elif description.startswith('AWS created network interface for directory'): 

1120 rtype = 'dir' 

1121 elif description == 'DMSNetworkInterface': 

1122 rtype = 'dms' 

1123 elif description.startswith('arn:aws:ecs:'): 

1124 rtype = 'ecs' 

1125 elif description.startswith('EFS mount target for'): 

1126 rtype = 'fsmt' 

1127 elif description.startswith('ElastiCache'): 

1128 rtype = 'elasticache' 

1129 elif description.startswith('AWS ElasticMapReduce'): 

1130 rtype = 'emr' 

1131 elif description.startswith('CloudHSM Managed Interface'): 

1132 rtype = 'hsm' 

1133 elif description.startswith('CloudHsm ENI'): 

1134 rtype = 'hsmv2' 

1135 elif description.startswith('AWS Lambda VPC ENI'): 

1136 rtype = 'lambda' 

1137 elif description.startswith('AWS Lambda VPC'): 

1138 rtype = 'lambda' 

1139 elif description.startswith('Interface for NAT Gateway'): 

1140 rtype = 'nat' 

1141 elif (description == 'RDSNetworkInterface' or 

1142 description.startswith('Network interface for DBProxy')): 

1143 rtype = 'rds' 

1144 elif description == 'RedshiftNetworkInterface': 

1145 rtype = 'redshift' 

1146 elif description.startswith('Network Interface for Transit Gateway Attachment'): 

1147 rtype = 'tgw' 

1148 elif description.startswith('VPC Endpoint Interface'): 

1149 rtype = 'vpce' 

1150 elif description.startswith('aws-k8s-branch-eni'): 

1151 rtype = 'eks' 

1152 else: 

1153 rtype = 'unknown' 

1154 return rtype 

1155 

1156 

1157class C7NJmespathFunctions(functions.Functions): 

1158 @functions.signature( 

1159 {'types': ['string']}, {'types': ['string']} 

1160 ) 

1161 def _func_split(self, sep, string): 

1162 return string.split(sep) 

1163 

1164 @functions.signature( 

1165 {'types': ['string']} 

1166 ) 

1167 def _func_from_json(self, string): 

1168 try: 

1169 return json.loads(string) 

1170 except json.JSONDecodeError: 

1171 return None 

1172 

1173 

1174class C7NJMESPathParser(Parser): 

1175 def parse(self, expression): 

1176 result = super().parse(expression) 

1177 return ParsedResultWithOptions( 

1178 expression=result.expression, 

1179 parsed=result.parsed 

1180 ) 

1181 

1182 

1183class ParsedResultWithOptions(ParsedResult): 

1184 def search(self, value, options=None): 

1185 # if options are explicitly passed in, we honor those 

1186 if not options: 

1187 options = jmespath.Options(custom_functions=C7NJmespathFunctions()) 

1188 return super().search(value, options) 

1189 

1190 

1191def jmespath_search(*args, **kwargs): 

1192 return jmespath.search( 

1193 *args, 

1194 **kwargs, 

1195 options=jmespath.Options(custom_functions=C7NJmespathFunctions()) 

1196 ) 

1197 

1198 

1199def get_path(path: str, resource: dict): 

1200 """ 

1201 This function provides a wrapper to obtain a value from a resource 

1202 in an efficient manner. 

1203 jmespath_search is expensive and it's rarely the case that 

1204 there is a path in the id field, therefore this wrapper is an optimisation. 

1205 

1206 :param path: the path or field name to fetch 

1207 :param resource: the resource instance description 

1208 :return: the field/path value 

1209 """ 

1210 if '.' in path: 

1211 return jmespath_search(path, resource) 

1212 return resource[path] 

1213 

1214 

1215def jmespath_compile(expression): 

1216 parsed = C7NJMESPathParser().parse(expression) 

1217 return parsed 

1218 

1219 

1220def snap_to_period_start(start: datetime, end: datetime, period_start: str): 

1221 """ 

1222 Adjust the start and end timestamps according to `period_start`. 

1223 

1224 Args: 

1225 start (datetime): The original start datetime. 

1226 end (datetime): The original end datetime. 

1227 period_start (str): One of 'auto', 'start-of-day'. 

1228 

1229 Returns: 

1230 Tuple[datetime, datetime]: Adjusted (start, end) 

1231 """ 

1232 if period_start == "start-of-day": 

1233 start = start.replace(hour=0, minute=0, second=0, microsecond=0) 

1234 end = end.replace(hour=0, minute=0, second=0, microsecond=0) 

1235 # 'auto' does nothing 

1236 return start, end