Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/utils.py: 27%

569 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3import copy 

4from collections import UserString 

5from datetime import datetime, timedelta 

6from dateutil.tz import tzutc 

7import json 

8import itertools 

9import ipaddress 

10import logging 

11import os 

12import random 

13import re 

14import sys 

15import threading 

16import time 

17from urllib import parse as urlparse 

18from urllib.request import getproxies, proxy_bypass 

19 

20from dateutil.parser import ParserError, parse 

21 

22import jmespath 

23from jmespath import functions 

24from jmespath.parser import Parser, ParsedResult 

25 

26from c7n import config 

27from c7n.exceptions import ClientError, PolicyValidationError 

28 

29# Try to play nice in a serverless environment, where we don't require yaml 

30 

31try: 

32 import yaml 

33except ImportError: # pragma: no cover 

34 SafeLoader = BaseSafeDumper = yaml = None 

35else: 

36 try: 

37 from yaml import CSafeLoader as SafeLoader, CSafeDumper as BaseSafeDumper 

38 except ImportError: # pragma: no cover 

39 from yaml import SafeLoader, SafeDumper as BaseSafeDumper 

40 

41 

42class SafeDumper(BaseSafeDumper or object): 

43 def ignore_aliases(self, data): 

44 return True 

45 

46 

47log = logging.getLogger('custodian.utils') 

48 

49 

50class VarsSubstitutionError(Exception): 

51 pass 

52 

53 

54def load_file(path, format=None, vars=None): 

55 if format is None: 

56 format = 'yaml' 

57 _, ext = os.path.splitext(path) 

58 if ext[1:] == 'json': 

59 format = 'json' 

60 

61 with open(path) as fh: 

62 contents = fh.read() 

63 

64 if vars: 

65 try: 

66 contents = contents.format(**vars) 

67 except IndexError: 

68 msg = 'Failed to substitute variable by positional argument.' 

69 raise VarsSubstitutionError(msg) 

70 except KeyError as e: 

71 msg = 'Failed to substitute variables. KeyError on {}'.format(str(e)) 

72 raise VarsSubstitutionError(msg) 

73 

74 if format == 'yaml': 

75 return yaml_load(contents) 

76 elif format == 'json': 

77 return loads(contents) 

78 

79 

80def yaml_load(value): 

81 if yaml is None: 

82 raise RuntimeError("Yaml not available") 

83 return yaml.load(value, Loader=SafeLoader) 

84 

85 

86def yaml_dump(value): 

87 if yaml is None: 

88 raise RuntimeError("Yaml not available") 

89 return yaml.dump(value, default_flow_style=False, Dumper=SafeDumper) 

90 

91 

92def loads(body): 

93 return json.loads(body) 

94 

95 

96def dumps(data, fh=None, indent=0): 

97 if fh: 

98 return json.dump(data, fh, cls=JsonEncoder, indent=indent) 

99 else: 

100 return json.dumps(data, cls=JsonEncoder, indent=indent) 

101 

102 

103def format_event(evt): 

104 return json.dumps(evt, indent=2) 

105 

106 

107def filter_empty(d): 

108 for k, v in list(d.items()): 

109 if not v: 

110 del d[k] 

111 return d 

112 

113 

114# We need a minimum floor when examining possible timestamp 

115# values to distinguish from other numeric time usages. Use 

116# the S3 Launch Date. 

117DATE_FLOOR = time.mktime((2006, 3, 19, 0, 0, 0, 0, 0, 0)) 

118 

119 

120def parse_date(v, tz=None): 

121 """Handle various permutations of a datetime serialization 

122 to a datetime with the given timezone. 

123 

124 Handles strings, seconds since epoch, and milliseconds since epoch. 

125 """ 

126 

127 if v is None: 

128 return v 

129 

130 tz = tz or tzutc() 

131 

132 if isinstance(v, datetime): 

133 if v.tzinfo is None: 

134 return v.astimezone(tz) 

135 return v 

136 

137 if isinstance(v, str) and not v.isdigit(): 

138 try: 

139 return parse(v).astimezone(tz) 

140 except (AttributeError, TypeError, ValueError, OverflowError): 

141 pass 

142 

143 # OSError on windows -- https://bugs.python.org/issue36439 

144 exceptions = (ValueError, OSError) if os.name == "nt" else (ValueError) 

145 

146 if isinstance(v, (int, float, str)): 

147 try: 

148 if float(v) > DATE_FLOOR: 

149 v = datetime.fromtimestamp(float(v)).astimezone(tz) 

150 except exceptions: 

151 pass 

152 

153 if isinstance(v, (int, float, str)): 

154 # try interpreting as milliseconds epoch 

155 try: 

156 if float(v) > DATE_FLOOR: 

157 v = datetime.fromtimestamp(float(v) / 1000).astimezone(tz) 

158 except exceptions: 

159 pass 

160 

161 return isinstance(v, datetime) and v or None 

162 

163 

164def type_schema( 

165 type_name, inherits=None, rinherit=None, 

166 aliases=None, required=None, **props): 

167 """jsonschema generation helper 

168 

169 params: 

170 - type_name: name of the type 

171 - inherits: list of document fragments that are required via anyOf[$ref] 

172 - rinherit: use another schema as a base for this, basically work around 

173 inherits issues with additionalProperties and type enums. 

174 - aliases: additional names this type maybe called 

175 - required: list of required properties, by default 'type' is required 

176 - props: additional key value properties 

177 """ 

178 if aliases: 

179 type_names = [type_name] 

180 type_names.extend(aliases) 

181 else: 

182 type_names = [type_name] 

183 

184 if rinherit: 

185 s = copy.deepcopy(rinherit) 

186 s['properties']['type'] = {'enum': type_names} 

187 else: 

188 s = { 

189 'type': 'object', 

190 'properties': { 

191 'type': {'enum': type_names}}} 

192 

193 # Ref based inheritance and additional properties don't mix well. 

194 # https://stackoverflow.com/questions/22689900/json-schema-allof-with-additionalproperties 

195 if not inherits: 

196 s['additionalProperties'] = False 

197 

198 s['properties'].update(props) 

199 

200 for k, v in props.items(): 

201 if v is None: 

202 del s['properties'][k] 

203 if not required: 

204 required = [] 

205 if isinstance(required, list): 

206 required.append('type') 

207 s['required'] = required 

208 if inherits: 

209 extended = s 

210 s = {'allOf': [{'$ref': i} for i in inherits]} 

211 s['allOf'].append(extended) 

212 return s 

213 

214 

215class JsonEncoder(json.JSONEncoder): 

216 

217 def default(self, obj): 

218 if isinstance(obj, datetime): 

219 return obj.isoformat() 

220 if isinstance(obj, FormatDate): 

221 return obj.datetime.isoformat() 

222 if isinstance(obj, bytes): 

223 return obj.decode('utf8', errors="ignore") 

224 return json.JSONEncoder.default(self, obj) 

225 

226 

227def group_by(resources, key): 

228 """Return a mapping of key value to resources with the corresponding value. 

229 

230 Key may be specified as dotted form for nested dictionary lookup 

231 """ 

232 resource_map = {} 

233 parts = key.split('.') 

234 for r in resources: 

235 v = r 

236 for k in parts: 

237 v = v.get(k) 

238 if not isinstance(v, dict): 

239 break 

240 resource_map.setdefault(v, []).append(r) 

241 return resource_map 

242 

243 

244def chunks(iterable, size=50): 

245 """Break an iterable into lists of size""" 

246 batch = [] 

247 for n in iterable: 

248 batch.append(n) 

249 if len(batch) % size == 0: 

250 yield batch 

251 batch = [] 

252 if batch: 

253 yield batch 

254 

255 

256def camelResource(obj, implicitDate=False, implicitTitle=True): 

257 """Some sources from apis return lowerCased where as describe calls 

258 

259 always return TitleCase, this function turns the former to the later 

260 

261 implicitDate ~ automatically sniff keys that look like isoformat date strings 

262 and convert to python datetime objects. 

263 """ 

264 if not isinstance(obj, dict): 

265 return obj 

266 for k in list(obj.keys()): 

267 v = obj.pop(k) 

268 if implicitTitle: 

269 ok = "%s%s" % (k[0].upper(), k[1:]) 

270 else: 

271 ok = k 

272 obj[ok] = v 

273 

274 if implicitDate: 

275 # config service handles datetime differently then describe sdks 

276 # the sdks use knowledge of the shape to support language native 

277 # date times, while config just turns everything into a serialized 

278 # json with mangled keys without type info. to normalize to describe 

279 # we implicitly sniff keys which look like datetimes, and have an 

280 # isoformat marker ('T'). 

281 kn = k.lower() 

282 if isinstance(v, (str, int)) and ('time' in kn or 'date' in kn): 

283 try: 

284 dv = parse_date(v) 

285 except ParserError: 

286 dv = None 

287 if dv: 

288 obj[ok] = dv 

289 if isinstance(v, dict): 

290 camelResource(v, implicitDate, implicitTitle) 

291 elif isinstance(v, list): 

292 for e in v: 

293 camelResource(e, implicitDate, implicitTitle) 

294 return obj 

295 

296 

297def get_account_id_from_sts(session): 

298 response = session.client('sts').get_caller_identity() 

299 return response.get('Account') 

300 

301 

302def get_account_alias_from_sts(session): 

303 response = session.client('iam').list_account_aliases() 

304 aliases = response.get('AccountAliases', ()) 

305 return aliases and aliases[0] or '' 

306 

307 

308def query_instances(session, client=None, **query): 

309 """Return a list of ec2 instances for the query. 

310 """ 

311 if client is None: 

312 client = session.client('ec2') 

313 p = client.get_paginator('describe_instances') 

314 results = p.paginate(**query) 

315 return list(itertools.chain( 

316 *[r["Instances"] for r in itertools.chain( 

317 *[pp['Reservations'] for pp in results])])) 

318 

319 

320CONN_CACHE = threading.local() 

321 

322 

323def local_session(factory, region=None): 

324 """Cache a session thread local for up to 45m""" 

325 factory_region = getattr(factory, 'region', 'global') 

326 if region: 

327 factory_region = region 

328 s = getattr(CONN_CACHE, factory_region, {}).get('session') 

329 t = getattr(CONN_CACHE, factory_region, {}).get('time') 

330 

331 n = time.time() 

332 if s is not None and t + (60 * 45) > n: 

333 return s 

334 s = factory() 

335 

336 setattr(CONN_CACHE, factory_region, {'session': s, 'time': n}) 

337 return s 

338 

339 

340def reset_session_cache(): 

341 for k in [k for k in dir(CONN_CACHE) if not k.startswith('_')]: 

342 setattr(CONN_CACHE, k, {}) 

343 

344 

345def annotation(i, k): 

346 return i.get(k, ()) 

347 

348 

349def set_annotation(i, k, v): 

350 """ 

351 >>> x = {} 

352 >>> set_annotation(x, 'marker', 'a') 

353 >>> annotation(x, 'marker') 

354 ['a'] 

355 """ 

356 if not isinstance(i, dict): 

357 raise ValueError("Can only annotate dictionaries") 

358 

359 if not isinstance(v, list): 

360 v = [v] 

361 

362 if k in i: 

363 ev = i.get(k) 

364 if isinstance(ev, list): 

365 ev.extend(v) 

366 else: 

367 i[k] = v 

368 

369 

370def parse_s3(s3_path): 

371 if not s3_path.startswith('s3://'): 

372 raise ValueError("invalid s3 path") 

373 ridx = s3_path.find('/', 5) 

374 if ridx == -1: 

375 ridx = None 

376 bucket = s3_path[5:ridx] 

377 s3_path = s3_path.rstrip('/') 

378 if ridx is None: 

379 key_prefix = "" 

380 else: 

381 key_prefix = s3_path[s3_path.find('/', 5):] 

382 return s3_path, bucket, key_prefix 

383 

384 

385REGION_PARTITION_MAP = { 

386 'us-gov-east-1': 'aws-us-gov', 

387 'us-gov-west-1': 'aws-us-gov', 

388 'cn-north-1': 'aws-cn', 

389 'cn-northwest-1': 'aws-cn', 

390 'us-isob-east-1': 'aws-iso-b', 

391 'us-iso-east-1': 'aws-iso' 

392} 

393 

394 

395def get_partition(region): 

396 return REGION_PARTITION_MAP.get(region, 'aws') 

397 

398 

399def generate_arn( 

400 service, resource, partition='aws', 

401 region=None, account_id=None, resource_type=None, separator='/'): 

402 """Generate an Amazon Resource Name. 

403 See http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html. 

404 """ 

405 if region and region in REGION_PARTITION_MAP: 

406 partition = REGION_PARTITION_MAP[region] 

407 if service == 's3': 

408 region = '' 

409 arn = 'arn:%s:%s:%s:%s:' % ( 

410 partition, service, region if region else '', account_id if account_id else '') 

411 if resource_type: 

412 if resource.startswith(separator): 

413 separator = '' 

414 arn = arn + '%s%s%s' % (resource_type, separator, resource) 

415 else: 

416 arn = arn + resource 

417 return arn 

418 

419 

420def snapshot_identifier(prefix, db_identifier): 

421 """Return an identifier for a snapshot of a database or cluster. 

422 """ 

423 now = datetime.now() 

424 return '%s-%s-%s' % (prefix, db_identifier, now.strftime('%Y-%m-%d-%H-%M')) 

425 

426 

427retry_log = logging.getLogger('c7n.retry') 

428 

429 

430def get_retry(retry_codes=(), max_attempts=8, min_delay=1, log_retries=False): 

431 """Decorator for retry boto3 api call on transient errors. 

432 

433 https://www.awsarchitectureblog.com/2015/03/backoff.html 

434 https://en.wikipedia.org/wiki/Exponential_backoff 

435 

436 :param codes: A sequence of retryable error codes. 

437 :param max_attempts: The max number of retries, by default the delay 

438 time is proportional to the max number of attempts. 

439 :param log_retries: Whether we should log retries, if specified 

440 specifies the level at which the retry should be logged. 

441 :param _max_delay: The maximum delay for any retry interval *note* 

442 this parameter is only exposed for unit testing, as its 

443 derived from the number of attempts. 

444 

445 Returns a function for invoking aws client calls that 

446 retries on retryable error codes. 

447 """ 

448 max_delay = max(min_delay, 2) ** max_attempts 

449 

450 def _retry(func, *args, ignore_err_codes=(), **kw): 

451 for idx, delay in enumerate( 

452 backoff_delays(min_delay, max_delay, jitter=True)): 

453 try: 

454 return func(*args, **kw) 

455 except ClientError as e: 

456 if e.response['Error']['Code'] in ignore_err_codes: 

457 return 

458 elif e.response['Error']['Code'] not in retry_codes: 

459 raise 

460 elif idx == max_attempts - 1: 

461 raise 

462 if log_retries: 

463 retry_log.log( 

464 log_retries, 

465 "retrying %s on error:%s attempt:%d last delay:%0.2f", 

466 func, e.response['Error']['Code'], idx, delay) 

467 time.sleep(delay) 

468 return _retry 

469 

470 

471def backoff_delays(start, stop, factor=2.0, jitter=False): 

472 """Geometric backoff sequence w/ jitter 

473 """ 

474 cur = start 

475 while cur <= stop: 

476 if jitter: 

477 yield cur - (cur * random.random() / 5) 

478 else: 

479 yield cur 

480 cur = cur * factor 

481 

482 

483def parse_cidr(value): 

484 """Process cidr ranges.""" 

485 if isinstance(value, list) or isinstance(value, set): 

486 return IPv4List([parse_cidr(item) for item in value]) 

487 klass = IPv4Network 

488 if '/' not in value: 

489 klass = ipaddress.ip_address 

490 try: 

491 v = klass(str(value)) 

492 except (ipaddress.AddressValueError, ValueError): 

493 v = None 

494 return v 

495 

496 

497class IPv4Network(ipaddress.IPv4Network): 

498 

499 # Override for net 2 net containment comparison 

500 def __contains__(self, other): 

501 if other is None: 

502 return False 

503 if isinstance(other, ipaddress._BaseNetwork): 

504 return self.supernet_of(other) 

505 return super(IPv4Network, self).__contains__(other) 

506 

507 if (sys.version_info.major == 3 and sys.version_info.minor <= 6): # pragma: no cover 

508 @staticmethod 

509 def _is_subnet_of(a, b): 

510 try: 

511 # Always false if one is v4 and the other is v6. 

512 if a._version != b._version: 

513 raise TypeError(f"{a} and {b} are not of the same version") 

514 return (b.network_address <= a.network_address and 

515 b.broadcast_address >= a.broadcast_address) 

516 except AttributeError: 

517 raise TypeError(f"Unable to test subnet containment " 

518 f"between {a} and {b}") 

519 

520 def supernet_of(self, other): 

521 """Return True if this network is a supernet of other.""" 

522 return self._is_subnet_of(other, self) 

523 

524 

525class IPv4List: 

526 def __init__(self, ipv4_list): 

527 self.ipv4_list = ipv4_list 

528 

529 def __contains__(self, other): 

530 if other is None: 

531 return False 

532 in_networks = any([other in y_elem for y_elem in self.ipv4_list 

533 if isinstance(y_elem, IPv4Network)]) 

534 in_addresses = any([other == y_elem for y_elem in self.ipv4_list 

535 if isinstance(y_elem, ipaddress.IPv4Address)]) 

536 return any([in_networks, in_addresses]) 

537 

538 

539def reformat_schema(model): 

540 """ Reformat schema to be in a more displayable format. """ 

541 if not hasattr(model, 'schema'): 

542 return "Model '{}' does not have a schema".format(model) 

543 

544 if 'properties' not in model.schema: 

545 return "Schema in unexpected format." 

546 

547 ret = copy.deepcopy(model.schema['properties']) 

548 

549 if 'type' in ret: 

550 del ret['type'] 

551 

552 for key in model.schema.get('required', []): 

553 if key in ret: 

554 ret[key]['required'] = True 

555 

556 return ret 

557 

558 

559# from botocore.utils avoiding runtime dependency for botocore for other providers. 

560# license apache 2.0 

561def set_value_from_jmespath(source, expression, value, is_first=True): 

562 # This takes a (limited) jmespath-like expression & can set a value based 

563 # on it. 

564 # Limitations: 

565 # * Only handles dotted lookups 

566 # * No offsets/wildcards/slices/etc. 

567 bits = expression.split('.', 1) 

568 current_key, remainder = bits[0], bits[1] if len(bits) > 1 else '' 

569 

570 if not current_key: 

571 raise ValueError(expression) 

572 

573 if remainder: 

574 if current_key not in source: 

575 # We've got something in the expression that's not present in the 

576 # source (new key). If there's any more bits, we'll set the key 

577 # with an empty dictionary. 

578 source[current_key] = {} 

579 

580 return set_value_from_jmespath( 

581 source[current_key], 

582 remainder, 

583 value, 

584 is_first=False 

585 ) 

586 

587 # If we're down to a single key, set it. 

588 source[current_key] = value 

589 

590 

591def format_string_values(obj, err_fallback=(IndexError, KeyError), formatter=None, *args, **kwargs): 

592 """ 

593 Format all string values in an object. 

594 Return the updated object 

595 """ 

596 if isinstance(obj, dict): 

597 new = {} 

598 for key in obj.keys(): 

599 new[key] = format_string_values(obj[key], formatter=formatter, *args, **kwargs) 

600 return new 

601 elif isinstance(obj, list): 

602 new = [] 

603 for item in obj: 

604 new.append(format_string_values(item, formatter=formatter, *args, **kwargs)) 

605 return new 

606 elif isinstance(obj, str): 

607 try: 

608 if formatter: 

609 return formatter(obj, *args, **kwargs) 

610 else: 

611 return obj.format(*args, **kwargs) 

612 except err_fallback: 

613 return obj 

614 else: 

615 return obj 

616 

617 

618def parse_url_config(url): 

619 if url and '://' not in url: 

620 url += "://" 

621 conf = config.Bag() 

622 parsed = urlparse.urlparse(url) 

623 for k in ('scheme', 'netloc', 'path'): 

624 conf[k] = getattr(parsed, k) 

625 for k, v in urlparse.parse_qs(parsed.query).items(): 

626 conf[k] = v[0] 

627 conf['url'] = url 

628 return conf 

629 

630 

631def join_output_path(output_path, *parts): 

632 # allow users to specify interpolated output paths 

633 if '{' in output_path: 

634 return output_path 

635 

636 if "://" not in output_path: 

637 return os.path.join(output_path, *parts) 

638 

639 # handle urls with query strings 

640 parsed = urlparse.urlparse(output_path) 

641 updated_path = "/".join((parsed.path, *parts)) 

642 parts = list(parsed) 

643 parts[2] = updated_path 

644 return urlparse.urlunparse(parts) 

645 

646 

647def get_policy_provider(policy_data): 

648 if isinstance(policy_data['resource'], list): 

649 provider_name, _ = policy_data['resource'][0].split('.', 1) 

650 elif '.' in policy_data['resource']: 

651 provider_name, resource_type = policy_data['resource'].split('.', 1) 

652 else: 

653 provider_name = 'aws' 

654 return provider_name 

655 

656 

657def get_proxy_url(url): 

658 proxies = getproxies() 

659 parsed = urlparse.urlparse(url) 

660 

661 proxy_keys = [ 

662 parsed.scheme + '://' + parsed.netloc, 

663 parsed.scheme, 

664 'all://' + parsed.netloc, 

665 'all' 

666 ] 

667 

668 # Set port if not defined explicitly in url. 

669 port = parsed.port 

670 if port is None and parsed.scheme == 'http': 

671 port = 80 

672 elif port is None and parsed.scheme == 'https': 

673 port = 443 

674 

675 hostname = parsed.hostname is not None and parsed.hostname or '' 

676 

677 # Determine if proxy should be used based on no_proxy entries. 

678 # Note this does not support no_proxy ip or cidr entries. 

679 if proxy_bypass("%s:%s" % (hostname, port)): 

680 return None 

681 

682 for key in proxy_keys: 

683 if key in proxies: 

684 return proxies[key] 

685 

686 return None 

687 

688 

689class DeferredFormatString(UserString): 

690 """A string that returns itself when formatted 

691 

692 Let any format spec pass through. This lets us selectively defer 

693 expansion of runtime variables without losing format spec details. 

694 """ 

695 def __format__(self, format_spec): 

696 return "".join(("{", self.data, f":{format_spec}" if format_spec else "", "}")) 

697 

698 

699class FormatDate: 

700 """a datetime wrapper with extended pyformat syntax""" 

701 

702 date_increment = re.compile(r'\+[0-9]+[Mdh]') 

703 

704 def __init__(self, d=None): 

705 self._d = d 

706 

707 def __str__(self): 

708 return str(self._d) 

709 

710 @property 

711 def datetime(self): 

712 return self._d 

713 

714 @classmethod 

715 def utcnow(cls): 

716 return cls(datetime.utcnow()) 

717 

718 def __getattr__(self, k): 

719 return getattr(self._d, k) 

720 

721 def __format__(self, fmt=None): 

722 d = self._d 

723 increments = self.date_increment.findall(fmt) 

724 for i in increments: 

725 p = {} 

726 if i[-1] == 'M': 

727 p['minutes'] = float(i[1:-1]) 

728 if i[-1] == 'h': 

729 p['hours'] = float(i[1:-1]) 

730 if i[-1] == 'd': 

731 p['days'] = float(i[1:-1]) 

732 d = d + timedelta(**p) 

733 if increments: 

734 fmt = self.date_increment.sub("", fmt) 

735 return d.__format__(fmt) 

736 

737 

738class QueryParser: 

739 

740 QuerySchema = {} 

741 type_name = '' 

742 multi_value = True 

743 value_key = 'Values' 

744 

745 @classmethod 

746 def parse(cls, data): 

747 filters = [] 

748 if not isinstance(data, (tuple, list)): 

749 raise PolicyValidationError( 

750 "%s Query invalid format, must be array of dicts %s" % ( 

751 cls.type_name, 

752 data)) 

753 for d in data: 

754 if not isinstance(d, dict): 

755 raise PolicyValidationError( 

756 "%s Query Filter Invalid %s" % (cls.type_name, data)) 

757 if "Name" not in d or cls.value_key not in d: 

758 raise PolicyValidationError( 

759 "%s Query Filter Invalid: Missing Key or Values in %s" % ( 

760 cls.type_name, data)) 

761 

762 key = d['Name'] 

763 values = d[cls.value_key] 

764 

765 if not cls.multi_value and isinstance(values, list): 

766 raise PolicyValidationError( 

767 "%s Query Filter Invalid Key: Value:%s Must be single valued" % ( 

768 cls.type_name, key)) 

769 elif not cls.multi_value: 

770 values = [values] 

771 

772 if key not in cls.QuerySchema and not key.startswith('tag:'): 

773 raise PolicyValidationError( 

774 "%s Query Filter Invalid Key:%s Valid: %s" % ( 

775 cls.type_name, key, ", ".join(cls.QuerySchema.keys()))) 

776 

777 vtype = cls.QuerySchema.get(key) 

778 if vtype is None and key.startswith('tag'): 

779 vtype = str 

780 

781 if not isinstance(values, list): 

782 raise PolicyValidationError( 

783 "%s Query Filter Invalid Values, must be array %s" % ( 

784 cls.type_name, data,)) 

785 

786 for v in values: 

787 if isinstance(vtype, tuple): 

788 if v not in vtype: 

789 raise PolicyValidationError( 

790 "%s Query Filter Invalid Value: %s Valid: %s" % ( 

791 cls.type_name, v, ", ".join(vtype))) 

792 elif not isinstance(v, vtype): 

793 raise PolicyValidationError( 

794 "%s Query Filter Invalid Value Type %s" % ( 

795 cls.type_name, data,)) 

796 

797 filters.append(d) 

798 

799 return filters 

800 

801 

802def get_annotation_prefix(s): 

803 return 'c7n:{}'.format(s) 

804 

805 

806def merge_dict_list(dict_iter): 

807 """take an list of dictionaries and merge them. 

808 

809 last dict wins/overwrites on keys. 

810 """ 

811 result = {} 

812 for d in dict_iter: 

813 result.update(d) 

814 return result 

815 

816 

817def merge_dict(a, b): 

818 """Perform a merge of dictionaries a and b 

819 

820 Any subdictionaries will be recursively merged. 

821 Any leaf elements in the form of a list or scalar will use the value from a 

822 """ 

823 d = {} 

824 for k, v in a.items(): 

825 if k not in b: 

826 d[k] = v 

827 elif isinstance(v, dict) and isinstance(b[k], dict): 

828 d[k] = merge_dict(v, b[k]) 

829 for k, v in b.items(): 

830 if k not in d: 

831 d[k] = v 

832 return d 

833 

834 

835def select_keys(d, keys): 

836 result = {} 

837 for k in keys: 

838 result[k] = d.get(k) 

839 return result 

840 

841 

842def get_human_size(size, precision=2): 

843 # interesting discussion on 1024 vs 1000 as base 

844 # https://en.wikipedia.org/wiki/Binary_prefix 

845 suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] 

846 suffixIndex = 0 

847 while size > 1024: 

848 suffixIndex += 1 

849 size = size / 1024.0 

850 

851 return "%.*f %s" % (precision, size, suffixes[suffixIndex]) 

852 

853 

854def get_support_region(manager): 

855 # support is a unique service in that it doesnt support regional endpoints 

856 # thus, we need to construct the client based off the regions found here: 

857 # https://docs.aws.amazon.com/general/latest/gr/awssupport.html 

858 # 

859 # aws-cn uses cn-north-1 for both the Beijing and Ningxia regions 

860 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Beijing.html 

861 # https://docs.amazonaws.cn/en_us/aws/latest/userguide/endpoints-Ningxia.html 

862 

863 partition = get_partition(manager.config.region) 

864 support_region = None 

865 if partition == "aws": 

866 support_region = "us-east-1" 

867 elif partition == "aws-us-gov": 

868 support_region = "us-gov-west-1" 

869 elif partition == "aws-cn": 

870 support_region = "cn-north-1" 

871 return support_region 

872 

873 

874def get_eni_resource_type(eni): 

875 if eni.get('Attachment'): 

876 instance_id = eni['Attachment'].get('InstanceId') 

877 else: 

878 instance_id = None 

879 description = eni.get('Description') 

880 # EC2 

881 if instance_id: 

882 rtype = 'ec2' 

883 # ELB/ELBv2 

884 elif description.startswith('ELB app/'): 

885 rtype = 'elb-app' 

886 elif description.startswith('ELB net/'): 

887 rtype = 'elb-net' 

888 elif description.startswith('ELB gwy/'): 

889 rtype = 'elb-gwy' 

890 elif description.startswith('ELB'): 

891 rtype = 'elb' 

892 # Other Resources 

893 elif description == 'ENI managed by APIGateway': 

894 rtype = 'apigw' 

895 elif description.startswith('AWS CodeStar Connections'): 

896 rtype = 'codestar' 

897 elif description.startswith('DAX'): 

898 rtype = 'dax' 

899 elif description.startswith('AWS created network interface for directory'): 

900 rtype = 'dir' 

901 elif description == 'DMSNetworkInterface': 

902 rtype = 'dms' 

903 elif description.startswith('arn:aws:ecs:'): 

904 rtype = 'ecs' 

905 elif description.startswith('EFS mount target for'): 

906 rtype = 'fsmt' 

907 elif description.startswith('ElastiCache'): 

908 rtype = 'elasticache' 

909 elif description.startswith('AWS ElasticMapReduce'): 

910 rtype = 'emr' 

911 elif description.startswith('CloudHSM Managed Interface'): 

912 rtype = 'hsm' 

913 elif description.startswith('CloudHsm ENI'): 

914 rtype = 'hsmv2' 

915 elif description.startswith('AWS Lambda VPC'): 

916 rtype = 'lambda' 

917 elif description.startswith('AWS Lambda VPC ENI'): 

918 rtype = 'lambda' 

919 elif description.startswith('Interface for NAT Gateway'): 

920 rtype = 'nat' 

921 elif (description == 'RDSNetworkInterface' or 

922 description.startswith('Network interface for DBProxy')): 

923 rtype = 'rds' 

924 elif description == 'RedshiftNetworkInterface': 

925 rtype = 'redshift' 

926 elif description.startswith('Network Interface for Transit Gateway Attachment'): 

927 rtype = 'tgw' 

928 elif description.startswith('VPC Endpoint Interface'): 

929 rtype = 'vpce' 

930 elif description.startswith('aws-k8s-branch-eni'): 

931 rtype = 'eks' 

932 else: 

933 rtype = 'unknown' 

934 return rtype 

935 

936 

937class C7NJmespathFunctions(functions.Functions): 

938 @functions.signature( 

939 {'types': ['string']}, {'types': ['string']} 

940 ) 

941 def _func_split(self, sep, string): 

942 return string.split(sep) 

943 

944 

945class C7NJMESPathParser(Parser): 

946 def parse(self, expression): 

947 result = super().parse(expression) 

948 return ParsedResultWithOptions( 

949 expression=result.expression, 

950 parsed=result.parsed 

951 ) 

952 

953 

954class ParsedResultWithOptions(ParsedResult): 

955 def search(self, value, options=None): 

956 # if options are explicitly passed in, we honor those 

957 if not options: 

958 options = jmespath.Options(custom_functions=C7NJmespathFunctions()) 

959 return super().search(value, options) 

960 

961 

962def jmespath_search(*args, **kwargs): 

963 return jmespath.search( 

964 *args, 

965 **kwargs, 

966 options=jmespath.Options(custom_functions=C7NJmespathFunctions()) 

967 ) 

968 

969 

970def jmespath_compile(expression): 

971 parsed = C7NJMESPathParser().parse(expression) 

972 return parsed