Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/guarded_eval.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

867 statements  

1from copy import copy 

2from inspect import isclass, signature, Signature, getmodule 

3from typing import ( 

4 Annotated, 

5 AnyStr, 

6 Callable, 

7 Literal, 

8 NamedTuple, 

9 NewType, 

10 Optional, 

11 Protocol, 

12 Sequence, 

13 TypeGuard, 

14 Union, 

15 get_args, 

16 get_origin, 

17 is_typeddict, 

18) 

19import ast 

20import builtins 

21import collections 

22import dataclasses 

23import operator 

24import sys 

25import typing 

26import warnings 

27from functools import cached_property 

28from dataclasses import dataclass, field 

29from types import MethodDescriptorType, ModuleType, MethodType 

30 

31from IPython.utils.decorators import undoc 

32 

33import types 

34from typing import Self, LiteralString, get_type_hints 

35 

36if sys.version_info < (3, 12): 

37 from typing_extensions import TypeAliasType 

38else: 

39 from typing import TypeAliasType 

40 

41 

42@undoc 

43class HasGetItem(Protocol): 

44 def __getitem__(self, key) -> None: 

45 ... 

46 

47 

48@undoc 

49class InstancesHaveGetItem(Protocol): 

50 def __call__(self, *args, **kwargs) -> HasGetItem: 

51 ... 

52 

53 

54@undoc 

55class HasGetAttr(Protocol): 

56 def __getattr__(self, key) -> None: 

57 ... 

58 

59 

60@undoc 

61class DoesNotHaveGetAttr(Protocol): 

62 pass 

63 

64 

65# By default `__getattr__` is not explicitly implemented on most objects 

66MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] 

67 

68 

69def _unbind_method(func: Callable) -> Union[Callable, None]: 

70 """Get unbound method for given bound method. 

71 

72 Returns None if cannot get unbound method, or method is already unbound. 

73 """ 

74 owner = getattr(func, "__self__", None) 

75 owner_class = type(owner) 

76 name = getattr(func, "__name__", None) 

77 instance_dict_overrides = getattr(owner, "__dict__", None) 

78 if ( 

79 owner is not None 

80 and name 

81 and ( 

82 not instance_dict_overrides 

83 or (instance_dict_overrides and name not in instance_dict_overrides) 

84 ) 

85 ): 

86 return getattr(owner_class, name) 

87 return None 

88 

89 

90@undoc 

91@dataclass 

92class EvaluationPolicy: 

93 """Definition of evaluation policy.""" 

94 

95 allow_locals_access: bool = False 

96 allow_globals_access: bool = False 

97 allow_item_access: bool = False 

98 allow_attr_access: bool = False 

99 allow_builtins_access: bool = False 

100 allow_all_operations: bool = False 

101 allow_any_calls: bool = False 

102 allow_auto_import: bool = False 

103 allowed_calls: set[Callable] = field(default_factory=set) 

104 

105 def can_get_item(self, value, item): 

106 return self.allow_item_access 

107 

108 def can_get_attr(self, value, attr): 

109 return self.allow_attr_access 

110 

111 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

112 if self.allow_all_operations: 

113 return True 

114 

115 def can_call(self, func): 

116 if self.allow_any_calls: 

117 return True 

118 

119 if func in self.allowed_calls: 

120 return True 

121 

122 owner_method = _unbind_method(func) 

123 

124 if owner_method and owner_method in self.allowed_calls: 

125 return True 

126 

127 

128def _get_external(module_name: str, access_path: Sequence[str]): 

129 """Get value from external module given a dotted access path. 

130 

131 Only gets value if the module is already imported. 

132 

133 Raises: 

134 * `KeyError` if module is removed not found, and 

135 * `AttributeError` if access path does not match an exported object 

136 """ 

137 try: 

138 member_type = sys.modules[module_name] 

139 # standard module 

140 for attr in access_path: 

141 member_type = getattr(member_type, attr) 

142 return member_type 

143 except (KeyError, AttributeError): 

144 # handle modules in namespace packages 

145 module_path = ".".join([module_name, *access_path]) 

146 if module_path in sys.modules: 

147 return sys.modules[module_path] 

148 raise 

149 

150 

151def _has_original_dunder_external( 

152 value, 

153 module_name: str, 

154 access_path: Sequence[str], 

155 method_name: str, 

156): 

157 if module_name not in sys.modules: 

158 full_module_path = ".".join([module_name, *access_path]) 

159 if full_module_path not in sys.modules: 

160 # LBYLB as it is faster 

161 return False 

162 try: 

163 member_type = _get_external(module_name, access_path) 

164 value_type = type(value) 

165 if type(value) == member_type: 

166 return True 

167 if isinstance(member_type, ModuleType): 

168 value_module = getmodule(value_type) 

169 if not value_module or not value_module.__name__: 

170 return False 

171 if ( 

172 value_module.__name__ == member_type.__name__ 

173 or value_module.__name__.startswith(member_type.__name__ + ".") 

174 ): 

175 return True 

176 if method_name == "__getattribute__": 

177 # we have to short-circuit here due to an unresolved issue in 

178 # `isinstance` implementation: https://bugs.python.org/issue32683 

179 return False 

180 if not isinstance(member_type, ModuleType) and isinstance(value, member_type): 

181 method = getattr(value_type, method_name, None) 

182 member_method = getattr(member_type, method_name, None) 

183 if member_method == method: 

184 return True 

185 if isinstance(member_type, ModuleType): 

186 method = getattr(value_type, method_name, None) 

187 for base_class in value_type.__mro__[1:]: 

188 base_module = getmodule(base_class) 

189 if base_module and ( 

190 base_module.__name__ == member_type.__name__ 

191 or base_module.__name__.startswith(member_type.__name__ + ".") 

192 ): 

193 # Check if the method comes from this trusted base class 

194 base_method = getattr(base_class, method_name, None) 

195 if base_method is not None and base_method == method: 

196 return True 

197 except (AttributeError, KeyError): 

198 return False 

199 

200 

201def _has_original_dunder( 

202 value, allowed_types, allowed_methods, allowed_external, method_name 

203): 

204 # note: Python ignores `__getattr__`/`__getitem__` on instances, 

205 # we only need to check at class level 

206 value_type = type(value) 

207 

208 # strict type check passes → no need to check method 

209 if value_type in allowed_types: 

210 return True 

211 

212 method = getattr(value_type, method_name, None) 

213 

214 if method is None: 

215 return None 

216 

217 if method in allowed_methods: 

218 return True 

219 

220 for module_name, *access_path in allowed_external: 

221 if _has_original_dunder_external(value, module_name, access_path, method_name): 

222 return True 

223 

224 return False 

225 

226 

227def _coerce_path_to_tuples( 

228 allow_list: set[tuple[str, ...] | str], 

229) -> set[tuple[str, ...]]: 

230 """Replace dotted paths on the provided allow-list with tuples.""" 

231 return { 

232 path if isinstance(path, tuple) else tuple(path.split(".")) 

233 for path in allow_list 

234 } 

235 

236 

237@undoc 

238@dataclass 

239class SelectivePolicy(EvaluationPolicy): 

240 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set) 

241 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set) 

242 

243 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set) 

244 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set) 

245 

246 allowed_operations: set = field(default_factory=set) 

247 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set) 

248 

249 allow_getitem_on_types: bool = field(default_factory=bool) 

250 

251 _operation_methods_cache: dict[str, set[Callable]] = field( 

252 default_factory=dict, init=False 

253 ) 

254 

255 def can_get_attr(self, value, attr): 

256 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external) 

257 

258 has_original_attribute = _has_original_dunder( 

259 value, 

260 allowed_types=self.allowed_getattr, 

261 allowed_methods=self._getattribute_methods, 

262 allowed_external=allowed_getattr_external, 

263 method_name="__getattribute__", 

264 ) 

265 has_original_attr = _has_original_dunder( 

266 value, 

267 allowed_types=self.allowed_getattr, 

268 allowed_methods=self._getattr_methods, 

269 allowed_external=allowed_getattr_external, 

270 method_name="__getattr__", 

271 ) 

272 

273 accept = False 

274 

275 # Many objects do not have `__getattr__`, this is fine. 

276 if has_original_attr is None and has_original_attribute: 

277 accept = True 

278 else: 

279 # Accept objects without modifications to `__getattr__` and `__getattribute__` 

280 accept = has_original_attr and has_original_attribute 

281 

282 if accept: 

283 # We still need to check for overridden properties. 

284 

285 value_class = type(value) 

286 if not hasattr(value_class, attr): 

287 return True 

288 

289 class_attr_val = getattr(value_class, attr) 

290 is_property = isinstance(class_attr_val, property) 

291 

292 if not is_property: 

293 return True 

294 

295 # Properties in allowed types are ok (although we do not include any 

296 # properties in our default allow list currently). 

297 if type(value) in self.allowed_getattr: 

298 return True # pragma: no cover 

299 

300 # Properties in subclasses of allowed types may be ok if not changed 

301 for module_name, *access_path in allowed_getattr_external: 

302 try: 

303 external_class = _get_external(module_name, access_path) 

304 external_class_attr_val = getattr(external_class, attr) 

305 except (KeyError, AttributeError): 

306 return False # pragma: no cover 

307 return class_attr_val == external_class_attr_val 

308 

309 return False 

310 

311 def can_get_item(self, value, item): 

312 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" 

313 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external) 

314 if self.allow_getitem_on_types: 

315 # e.g. Union[str, int] or Literal[True, 1] 

316 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)): 

317 return True 

318 # PEP 560 e.g. list[str] 

319 if isinstance(value, type) and hasattr(value, "__class_getitem__"): 

320 return True 

321 return _has_original_dunder( 

322 value, 

323 allowed_types=self.allowed_getitem, 

324 allowed_methods=self._getitem_methods, 

325 allowed_external=allowed_getitem_external, 

326 method_name="__getitem__", 

327 ) 

328 

329 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

330 allowed_operations_external = _coerce_path_to_tuples( 

331 self.allowed_operations_external 

332 ) 

333 objects = [a] 

334 if b is not None: 

335 objects.append(b) 

336 return all( 

337 [ 

338 _has_original_dunder( 

339 obj, 

340 allowed_types=self.allowed_operations, 

341 allowed_methods=self._operator_dunder_methods(dunder), 

342 allowed_external=allowed_operations_external, 

343 method_name=dunder, 

344 ) 

345 for dunder in dunders 

346 for obj in objects 

347 ] 

348 ) 

349 

350 def _operator_dunder_methods(self, dunder: str) -> set[Callable]: 

351 if dunder not in self._operation_methods_cache: 

352 self._operation_methods_cache[dunder] = self._safe_get_methods( 

353 self.allowed_operations, dunder 

354 ) 

355 return self._operation_methods_cache[dunder] 

356 

357 @cached_property 

358 def _getitem_methods(self) -> set[Callable]: 

359 return self._safe_get_methods(self.allowed_getitem, "__getitem__") 

360 

361 @cached_property 

362 def _getattr_methods(self) -> set[Callable]: 

363 return self._safe_get_methods(self.allowed_getattr, "__getattr__") 

364 

365 @cached_property 

366 def _getattribute_methods(self) -> set[Callable]: 

367 return self._safe_get_methods(self.allowed_getattr, "__getattribute__") 

368 

369 def _safe_get_methods(self, classes, name) -> set[Callable]: 

370 return { 

371 method 

372 for class_ in classes 

373 for method in [getattr(class_, name, None)] 

374 if method 

375 } 

376 

377 

378class _DummyNamedTuple(NamedTuple): 

379 """Used internally to retrieve methods of named tuple instance.""" 

380 

381 

382EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] 

383 

384 

385@dataclass 

386class EvaluationContext: 

387 #: Local namespace 

388 locals: dict 

389 #: Global namespace 

390 globals: dict 

391 #: Evaluation policy identifier 

392 evaluation: EvaluationPolicyName = "forbidden" 

393 #: Whether the evaluation of code takes place inside of a subscript. 

394 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. 

395 in_subscript: bool = False 

396 #: Auto import method 

397 auto_import: Callable[list[str], ModuleType] | None = None 

398 #: Overrides for evaluation policy 

399 policy_overrides: dict = field(default_factory=dict) 

400 #: Transient local namespace used to store mocks 

401 transient_locals: dict = field(default_factory=dict) 

402 #: Transients of class level 

403 class_transients: dict | None = None 

404 #: Instance variable name used in the method definition 

405 instance_arg_name: str | None = None 

406 #: Currently associated value 

407 #: Useful for adding items to _Duck on annotated assignment 

408 current_value: ast.AST | None = None 

409 

410 def replace(self, /, **changes): 

411 """Return a new copy of the context, with specified changes""" 

412 return dataclasses.replace(self, **changes) 

413 

414 

415class _IdentitySubscript: 

416 """Returns the key itself when item is requested via subscript.""" 

417 

418 def __getitem__(self, key): 

419 return key 

420 

421 

422IDENTITY_SUBSCRIPT = _IdentitySubscript() 

423SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" 

424UNKNOWN_SIGNATURE = Signature() 

425NOT_EVALUATED = object() 

426 

427 

428class GuardRejection(Exception): 

429 """Exception raised when guard rejects evaluation attempt.""" 

430 

431 pass 

432 

433 

434def guarded_eval(code: str, context: EvaluationContext): 

435 """Evaluate provided code in the evaluation context. 

436 

437 If evaluation policy given by context is set to ``forbidden`` 

438 no evaluation will be performed; if it is set to ``dangerous`` 

439 standard :func:`eval` will be used; finally, for any other, 

440 policy :func:`eval_node` will be called on parsed AST. 

441 """ 

442 locals_ = context.locals 

443 

444 if context.evaluation == "forbidden": 

445 raise GuardRejection("Forbidden mode") 

446 

447 # note: not using `ast.literal_eval` as it does not implement 

448 # getitem at all, for example it fails on simple `[0][1]` 

449 

450 if context.in_subscript: 

451 # syntactic sugar for ellipsis (:) is only available in subscripts 

452 # so we need to trick the ast parser into thinking that we have 

453 # a subscript, but we need to be able to later recognise that we did 

454 # it so we can ignore the actual __getitem__ operation 

455 if not code: 

456 return tuple() 

457 locals_ = locals_.copy() 

458 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT 

459 code = SUBSCRIPT_MARKER + "[" + code + "]" 

460 context = context.replace(locals=locals_) 

461 

462 if context.evaluation == "dangerous": 

463 return eval(code, context.globals, context.locals) 

464 

465 node = ast.parse(code, mode="exec") 

466 

467 return eval_node(node, context) 

468 

469 

470BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = { 

471 ast.Add: ("__add__",), 

472 ast.Sub: ("__sub__",), 

473 ast.Mult: ("__mul__",), 

474 ast.Div: ("__truediv__",), 

475 ast.FloorDiv: ("__floordiv__",), 

476 ast.Mod: ("__mod__",), 

477 ast.Pow: ("__pow__",), 

478 ast.LShift: ("__lshift__",), 

479 ast.RShift: ("__rshift__",), 

480 ast.BitOr: ("__or__",), 

481 ast.BitXor: ("__xor__",), 

482 ast.BitAnd: ("__and__",), 

483 ast.MatMult: ("__matmul__",), 

484} 

485 

486COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = { 

487 ast.Eq: ("__eq__",), 

488 ast.NotEq: ("__ne__", "__eq__"), 

489 ast.Lt: ("__lt__", "__gt__"), 

490 ast.LtE: ("__le__", "__ge__"), 

491 ast.Gt: ("__gt__", "__lt__"), 

492 ast.GtE: ("__ge__", "__le__"), 

493 ast.In: ("__contains__",), 

494 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially 

495} 

496 

497UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = { 

498 ast.USub: ("__neg__",), 

499 ast.UAdd: ("__pos__",), 

500 # we have to check both __inv__ and __invert__! 

501 ast.Invert: ("__invert__", "__inv__"), 

502 ast.Not: ("__not__",), 

503} 

504 

505GENERIC_CONTAINER_TYPES = (dict, list, set, tuple, frozenset) 

506 

507 

508class ImpersonatingDuck: 

509 """A dummy class used to create objects of other classes without calling their ``__init__``""" 

510 

511 # no-op: override __class__ to impersonate 

512 

513 

514class _Duck: 

515 """A dummy class used to create objects pretending to have given attributes""" 

516 

517 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): 

518 self.attributes = attributes if attributes is not None else {} 

519 self.items = items if items is not None else {} 

520 

521 def __getattr__(self, attr: str): 

522 return self.attributes[attr] 

523 

524 def __hasattr__(self, attr: str): 

525 return attr in self.attributes 

526 

527 def __dir__(self): 

528 return [*dir(super), *self.attributes] 

529 

530 def __getitem__(self, key: str): 

531 return self.items[key] 

532 

533 def __hasitem__(self, key: str): 

534 return self.items[key] 

535 

536 def _ipython_key_completions_(self): 

537 return self.items.keys() 

538 

539 

540def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]: 

541 dunder = None 

542 for op, candidate_dunder in dunders.items(): 

543 if isinstance(node_op, op): 

544 dunder = candidate_dunder 

545 return dunder 

546 

547 

548def get_policy(context: EvaluationContext) -> EvaluationPolicy: 

549 policy = copy(EVALUATION_POLICIES[context.evaluation]) 

550 

551 for key, value in context.policy_overrides.items(): 

552 if hasattr(policy, key): 

553 setattr(policy, key, value) 

554 return policy 

555 

556 

557def _validate_policy_overrides( 

558 policy_name: EvaluationPolicyName, policy_overrides: dict 

559) -> bool: 

560 policy = EVALUATION_POLICIES[policy_name] 

561 

562 all_good = True 

563 for key, value in policy_overrides.items(): 

564 if not hasattr(policy, key): 

565 warnings.warn( 

566 f"Override {key!r} is not valid with {policy_name!r} evaluation policy" 

567 ) 

568 all_good = False 

569 return all_good 

570 

571 

572def _is_type_annotation(obj) -> bool: 

573 """ 

574 Returns True if obj is a type annotation, False otherwise. 

575 """ 

576 if isinstance(obj, type): 

577 return True 

578 if isinstance(obj, types.GenericAlias): 

579 return True 

580 if hasattr(types, "UnionType") and isinstance(obj, types.UnionType): 

581 return True 

582 if isinstance(obj, (typing._SpecialForm, typing._BaseGenericAlias)): 

583 return True 

584 if isinstance(obj, typing.TypeVar): 

585 return True 

586 # Types that support __class_getitem__ 

587 if isinstance(obj, type) and hasattr(obj, "__class_getitem__"): 

588 return True 

589 # Fallback: check if get_origin returns something 

590 if hasattr(typing, "get_origin") and get_origin(obj) is not None: 

591 return True 

592 

593 return False 

594 

595 

596def _handle_assign(node: ast.Assign, context: EvaluationContext): 

597 value = eval_node(node.value, context) 

598 transient_locals = context.transient_locals 

599 policy = get_policy(context) 

600 class_transients = context.class_transients 

601 for target in node.targets: 

602 if isinstance(target, (ast.Tuple, ast.List)): 

603 # Handle unpacking assignment 

604 values = list(value) 

605 targets = target.elts 

606 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)] 

607 

608 # Unified handling: treat no starred as starred at end 

609 star_or_last_idx = starred[0] if starred else len(targets) 

610 

611 # Before starred 

612 for i in range(star_or_last_idx): 

613 # Check for self.x assignment 

614 if _is_instance_attribute_assignment(targets[i], context): 

615 class_transients[targets[i].attr] = values[i] 

616 else: 

617 transient_locals[targets[i].id] = values[i] 

618 

619 # Starred if exists 

620 if starred: 

621 end = len(values) - (len(targets) - star_or_last_idx - 1) 

622 if _is_instance_attribute_assignment( 

623 targets[star_or_last_idx], context 

624 ): 

625 class_transients[targets[star_or_last_idx].attr] = values[ 

626 star_or_last_idx:end 

627 ] 

628 else: 

629 transient_locals[targets[star_or_last_idx].value.id] = values[ 

630 star_or_last_idx:end 

631 ] 

632 

633 # After starred 

634 for i in range(star_or_last_idx + 1, len(targets)): 

635 if _is_instance_attribute_assignment(targets[i], context): 

636 class_transients[targets[i].attr] = values[ 

637 len(values) - (len(targets) - i) 

638 ] 

639 else: 

640 transient_locals[targets[i].id] = values[ 

641 len(values) - (len(targets) - i) 

642 ] 

643 elif isinstance(target, ast.Subscript): 

644 if isinstance(target.value, ast.Name): 

645 name = target.value.id 

646 container = transient_locals.get(name) 

647 if container is None: 

648 container = context.locals.get(name) 

649 if container is None: 

650 container = context.globals.get(name) 

651 if container is None: 

652 raise NameError( 

653 f"{name} not found in locals, globals, nor builtins" 

654 ) 

655 storage_dict = transient_locals 

656 storage_key = name 

657 elif isinstance( 

658 target.value, ast.Attribute 

659 ) and _is_instance_attribute_assignment(target.value, context): 

660 attr = target.value.attr 

661 container = class_transients.get(attr, None) 

662 if container is None: 

663 raise NameError(f"{attr} not found in class transients") 

664 storage_dict = class_transients 

665 storage_key = attr 

666 else: 

667 return 

668 

669 key = eval_node(target.slice, context) 

670 attributes = ( 

671 dict.fromkeys(dir(container)) 

672 if policy.can_call(container.__dir__) 

673 else {} 

674 ) 

675 items = {} 

676 

677 if policy.can_get_item(container, None): 

678 try: 

679 items = dict(container.items()) 

680 except Exception: 

681 pass 

682 

683 items[key] = value 

684 duck_container = _Duck(attributes=attributes, items=items) 

685 storage_dict[storage_key] = duck_container 

686 elif _is_instance_attribute_assignment(target, context): 

687 class_transients[target.attr] = value 

688 else: 

689 transient_locals[target.id] = value 

690 return None 

691 

692 

693def _handle_annassign(node, context): 

694 context_with_value = context.replace(current_value=getattr(node, "value", None)) 

695 annotation_result = eval_node(node.annotation, context_with_value) 

696 if _is_type_annotation(annotation_result): 

697 annotation_value = _resolve_annotation(annotation_result, context) 

698 # Use Value for generic types 

699 use_value = ( 

700 isinstance(annotation_value, GENERIC_CONTAINER_TYPES) and node.value is not None 

701 ) 

702 else: 

703 annotation_value = annotation_result 

704 use_value = False 

705 

706 # LOCAL VARIABLE 

707 if getattr(node, "simple", False) and isinstance(node.target, ast.Name): 

708 name = node.target.id 

709 if use_value: 

710 return _handle_assign( 

711 ast.Assign(targets=[node.target], value=node.value), context 

712 ) 

713 context.transient_locals[name] = annotation_value 

714 return None 

715 

716 # INSTANCE ATTRIBUTE 

717 if _is_instance_attribute_assignment(node.target, context): 

718 attr = node.target.attr 

719 if use_value: 

720 return _handle_assign( 

721 ast.Assign(targets=[node.target], value=node.value), context 

722 ) 

723 context.class_transients[attr] = annotation_value 

724 return None 

725 

726 return None 

727 

728def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext): 

729 args = [eval_node(arg, context) for arg in node.args] 

730 kwargs = { 

731 k: v 

732 for kw in node.keywords 

733 for k, v in ( 

734 {kw.arg: eval_node(kw.value, context)} 

735 if kw.arg 

736 else eval_node(kw.value, context) 

737 ).items() 

738 } 

739 return args, kwargs 

740 

741 

742def _is_instance_attribute_assignment( 

743 target: ast.AST, context: EvaluationContext 

744) -> bool: 

745 """Return True if target is an attribute access on the instance argument.""" 

746 return ( 

747 context.class_transients is not None 

748 and context.instance_arg_name is not None 

749 and isinstance(target, ast.Attribute) 

750 and isinstance(getattr(target, "value", None), ast.Name) 

751 and getattr(target.value, "id", None) == context.instance_arg_name 

752 ) 

753 

754 

755def _get_coroutine_attributes() -> dict[str, Optional[object]]: 

756 async def _dummy(): 

757 return None 

758 

759 coro = _dummy() 

760 try: 

761 return {attr: getattr(coro, attr, None) for attr in dir(coro)} 

762 finally: 

763 coro.close() 

764 

765 

766def eval_node(node: Union[ast.AST, None], context: EvaluationContext): 

767 """Evaluate AST node in provided context. 

768 

769 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. 

770 

771 Does not evaluate actions that always have side effects: 

772 

773 - class definitions (``class sth: ...``) 

774 - function definitions (``def sth: ...``) 

775 - variable assignments (``x = 1``) 

776 - augmented assignments (``x += 1``) 

777 - deletions (``del x``) 

778 

779 Does not evaluate operations which do not return values: 

780 

781 - assertions (``assert x``) 

782 - pass (``pass``) 

783 - imports (``import x``) 

784 - control flow: 

785 

786 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``) 

787 - loops (``for`` and ``while``) 

788 - exception handling 

789 

790 The purpose of this function is to guard against unwanted side-effects; 

791 it does not give guarantees on protection from malicious code execution. 

792 """ 

793 policy = get_policy(context) 

794 

795 if node is None: 

796 return None 

797 if isinstance(node, (ast.Interactive, ast.Module)): 

798 result = None 

799 for child_node in node.body: 

800 result = eval_node(child_node, context) 

801 return result 

802 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

803 is_async = isinstance(node, ast.AsyncFunctionDef) 

804 func_locals = context.transient_locals.copy() 

805 func_context = context.replace(transient_locals=func_locals) 

806 is_property = False 

807 is_static = False 

808 is_classmethod = False 

809 for decorator_node in node.decorator_list: 

810 try: 

811 decorator = eval_node(decorator_node, context) 

812 except NameError: 

813 # if the decorator is not yet defined this is fine 

814 # especialy because we don't handle imports yet 

815 continue 

816 if decorator is property: 

817 is_property = True 

818 elif decorator is staticmethod: 

819 is_static = True 

820 elif decorator is classmethod: 

821 is_classmethod = True 

822 

823 if func_context.class_transients is not None: 

824 if not is_static and not is_classmethod: 

825 func_context.instance_arg_name = ( 

826 node.args.args[0].arg if node.args.args else None 

827 ) 

828 

829 return_type = eval_node(node.returns, context=context) 

830 

831 for child_node in node.body: 

832 eval_node(child_node, func_context) 

833 

834 if is_property: 

835 if return_type is not None: 

836 if _is_type_annotation(return_type): 

837 context.transient_locals[node.name] = _resolve_annotation( 

838 return_type, context 

839 ) 

840 else: 

841 context.transient_locals[node.name] = return_type 

842 else: 

843 return_value = _infer_return_value(node, func_context) 

844 context.transient_locals[node.name] = return_value 

845 

846 return None 

847 

848 def dummy_function(*args, **kwargs): 

849 pass 

850 

851 if return_type is not None: 

852 if _is_type_annotation(return_type): 

853 dummy_function.__annotations__["return"] = return_type 

854 else: 

855 dummy_function.__inferred_return__ = return_type 

856 else: 

857 inferred_return = _infer_return_value(node, func_context) 

858 if inferred_return is not None: 

859 dummy_function.__inferred_return__ = inferred_return 

860 

861 dummy_function.__name__ = node.name 

862 dummy_function.__node__ = node 

863 dummy_function.__is_async__ = is_async 

864 context.transient_locals[node.name] = dummy_function 

865 return None 

866 if isinstance(node, ast.Lambda): 

867 

868 def dummy_function(*args, **kwargs): 

869 pass 

870 

871 dummy_function.__inferred_return__ = eval_node(node.body, context) 

872 return dummy_function 

873 if isinstance(node, ast.ClassDef): 

874 # TODO support class decorators? 

875 class_locals = {} 

876 outer_locals = context.locals.copy() 

877 outer_locals.update(context.transient_locals) 

878 class_context = context.replace( 

879 transient_locals=class_locals, locals=outer_locals 

880 ) 

881 class_context.class_transients = class_locals 

882 for child_node in node.body: 

883 eval_node(child_node, class_context) 

884 bases = tuple([eval_node(base, context) for base in node.bases]) 

885 dummy_class = type(node.name, bases, class_locals) 

886 context.transient_locals[node.name] = dummy_class 

887 return None 

888 if isinstance(node, ast.Await): 

889 value = eval_node(node.value, context) 

890 if hasattr(value, "__awaited_type__"): 

891 return value.__awaited_type__ 

892 return value 

893 if isinstance(node, ast.While): 

894 loop_locals = context.transient_locals.copy() 

895 loop_context = context.replace(transient_locals=loop_locals) 

896 

897 result = None 

898 for stmt in node.body: 

899 result = eval_node(stmt, loop_context) 

900 

901 policy = get_policy(context) 

902 merged_locals = _merge_dicts_by_key( 

903 [loop_locals, context.transient_locals.copy()], policy 

904 ) 

905 context.transient_locals.update(merged_locals) 

906 

907 return result 

908 if isinstance(node, ast.For): 

909 try: 

910 iterable = eval_node(node.iter, context) 

911 except Exception: 

912 iterable = None 

913 

914 sample = None 

915 if iterable is not None: 

916 try: 

917 if policy.can_call(getattr(iterable, "__iter__", None)): 

918 sample = next(iter(iterable)) 

919 except Exception: 

920 sample = None 

921 

922 loop_locals = context.transient_locals.copy() 

923 loop_context = context.replace(transient_locals=loop_locals) 

924 

925 if sample is not None: 

926 try: 

927 fake_assign = ast.Assign( 

928 targets=[node.target], value=ast.Constant(value=sample) 

929 ) 

930 _handle_assign(fake_assign, loop_context) 

931 except Exception: 

932 pass 

933 

934 result = None 

935 for stmt in node.body: 

936 result = eval_node(stmt, loop_context) 

937 

938 policy = get_policy(context) 

939 merged_locals = _merge_dicts_by_key( 

940 [loop_locals, context.transient_locals.copy()], policy 

941 ) 

942 context.transient_locals.update(merged_locals) 

943 

944 return result 

945 if isinstance(node, ast.If): 

946 branches = [] 

947 current = node 

948 result = None 

949 while True: 

950 branch_locals = context.transient_locals.copy() 

951 branch_context = context.replace(transient_locals=branch_locals) 

952 for stmt in current.body: 

953 result = eval_node(stmt, branch_context) 

954 branches.append(branch_locals) 

955 if not current.orelse: 

956 break 

957 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If): 

958 # It's an elif - continue loop 

959 current = current.orelse[0] 

960 else: 

961 # It's an else block - process and break 

962 else_locals = context.transient_locals.copy() 

963 else_context = context.replace(transient_locals=else_locals) 

964 for stmt in current.orelse: 

965 result = eval_node(stmt, else_context) 

966 branches.append(else_locals) 

967 break 

968 branches.append(context.transient_locals.copy()) 

969 policy = get_policy(context) 

970 merged_locals = _merge_dicts_by_key(branches, policy) 

971 context.transient_locals.update(merged_locals) 

972 return result 

973 if isinstance(node, ast.Assign): 

974 return _handle_assign(node, context) 

975 if isinstance(node, ast.AnnAssign): 

976 return _handle_annassign(node, context) 

977 if isinstance(node, ast.Expression): 

978 return eval_node(node.body, context) 

979 if isinstance(node, ast.Expr): 

980 return eval_node(node.value, context) 

981 if isinstance(node, ast.Pass): 

982 return None 

983 if isinstance(node, ast.Import): 

984 # TODO: populate transient_locals 

985 return None 

986 if isinstance(node, (ast.AugAssign, ast.Delete)): 

987 return None 

988 if isinstance(node, (ast.Global, ast.Nonlocal)): 

989 return None 

990 if isinstance(node, ast.BinOp): 

991 left = eval_node(node.left, context) 

992 right = eval_node(node.right, context) 

993 if ( 

994 isinstance(node.op, ast.BitOr) 

995 and _is_type_annotation(left) 

996 and _is_type_annotation(right) 

997 ): 

998 left_duck = ( 

999 _Duck(dict.fromkeys(dir(left))) 

1000 if policy.can_call(left.__dir__) 

1001 else _Duck() 

1002 ) 

1003 right_duck = ( 

1004 _Duck(dict.fromkeys(dir(right))) 

1005 if policy.can_call(right.__dir__) 

1006 else _Duck() 

1007 ) 

1008 value_node = context.current_value 

1009 if value_node is not None and isinstance(value_node, ast.Dict): 

1010 if dict in [left, right]: 

1011 return _merge_values( 

1012 [left_duck, right_duck, ast.literal_eval(value_node)], 

1013 policy=get_policy(context), 

1014 ) 

1015 return _merge_values([left_duck, right_duck], policy=get_policy(context)) 

1016 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) 

1017 if dunders: 

1018 if policy.can_operate(dunders, left, right): 

1019 return getattr(left, dunders[0])(right) 

1020 else: 

1021 raise GuardRejection( 

1022 f"Operation (`{dunders}`) for", 

1023 type(left), 

1024 f"not allowed in {context.evaluation} mode", 

1025 ) 

1026 if isinstance(node, ast.Compare): 

1027 left = eval_node(node.left, context) 

1028 all_true = True 

1029 negate = False 

1030 for op, right in zip(node.ops, node.comparators): 

1031 right = eval_node(right, context) 

1032 dunder = None 

1033 dunders = _find_dunder(op, COMP_OP_DUNDERS) 

1034 if not dunders: 

1035 if isinstance(op, ast.NotIn): 

1036 dunders = COMP_OP_DUNDERS[ast.In] 

1037 negate = True 

1038 if isinstance(op, ast.Is): 

1039 dunder = "is_" 

1040 if isinstance(op, ast.IsNot): 

1041 dunder = "is_" 

1042 negate = True 

1043 if not dunder and dunders: 

1044 dunder = dunders[0] 

1045 if dunder: 

1046 a, b = (right, left) if dunder == "__contains__" else (left, right) 

1047 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): 

1048 result = getattr(operator, dunder)(a, b) 

1049 if negate: 

1050 result = not result 

1051 if not result: 

1052 all_true = False 

1053 left = right 

1054 else: 

1055 raise GuardRejection( 

1056 f"Comparison (`{dunder}`) for", 

1057 type(left), 

1058 f"not allowed in {context.evaluation} mode", 

1059 ) 

1060 else: 

1061 raise ValueError( 

1062 f"Comparison `{dunder}` not supported" 

1063 ) # pragma: no cover 

1064 return all_true 

1065 if isinstance(node, ast.Constant): 

1066 return node.value 

1067 if isinstance(node, ast.Tuple): 

1068 return tuple(eval_node(e, context) for e in node.elts) 

1069 if isinstance(node, ast.List): 

1070 return [eval_node(e, context) for e in node.elts] 

1071 if isinstance(node, ast.Set): 

1072 return {eval_node(e, context) for e in node.elts} 

1073 if isinstance(node, ast.Dict): 

1074 return dict( 

1075 zip( 

1076 [eval_node(k, context) for k in node.keys], 

1077 [eval_node(v, context) for v in node.values], 

1078 ) 

1079 ) 

1080 if isinstance(node, ast.Slice): 

1081 return slice( 

1082 eval_node(node.lower, context), 

1083 eval_node(node.upper, context), 

1084 eval_node(node.step, context), 

1085 ) 

1086 if isinstance(node, ast.UnaryOp): 

1087 value = eval_node(node.operand, context) 

1088 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) 

1089 if dunders: 

1090 if policy.can_operate(dunders, value): 

1091 try: 

1092 return getattr(value, dunders[0])() 

1093 except AttributeError: 

1094 raise TypeError( 

1095 f"bad operand type for unary {node.op}: {type(value)}" 

1096 ) 

1097 else: 

1098 raise GuardRejection( 

1099 f"Operation (`{dunders}`) for", 

1100 type(value), 

1101 f"not allowed in {context.evaluation} mode", 

1102 ) 

1103 if isinstance(node, ast.Subscript): 

1104 value = eval_node(node.value, context) 

1105 slice_ = eval_node(node.slice, context) 

1106 if policy.can_get_item(value, slice_): 

1107 return value[slice_] 

1108 raise GuardRejection( 

1109 "Subscript access (`__getitem__`) for", 

1110 type(value), # not joined to avoid calling `repr` 

1111 f" not allowed in {context.evaluation} mode", 

1112 ) 

1113 if isinstance(node, ast.Name): 

1114 return _eval_node_name(node.id, context) 

1115 if isinstance(node, ast.Attribute): 

1116 if ( 

1117 context.class_transients is not None 

1118 and isinstance(node.value, ast.Name) 

1119 and node.value.id == context.instance_arg_name 

1120 ): 

1121 return context.class_transients.get(node.attr) 

1122 value = eval_node(node.value, context) 

1123 if policy.can_get_attr(value, node.attr): 

1124 return getattr(value, node.attr) 

1125 try: 

1126 cls = ( 

1127 value if isinstance(value, type) else getattr(value, "__class__", None) 

1128 ) 

1129 if cls is not None: 

1130 resolved_hints = get_type_hints( 

1131 cls, 

1132 globalns=(context.globals or {}), 

1133 localns=(context.locals or {}), 

1134 ) 

1135 if node.attr in resolved_hints: 

1136 annotated = resolved_hints[node.attr] 

1137 return _resolve_annotation(annotated, context) 

1138 except Exception: 

1139 # Fall through to the guard rejection 

1140 pass 

1141 raise GuardRejection( 

1142 "Attribute access (`__getattr__`) for", 

1143 type(value), # not joined to avoid calling `repr` 

1144 f"not allowed in {context.evaluation} mode", 

1145 ) 

1146 if isinstance(node, ast.IfExp): 

1147 test = eval_node(node.test, context) 

1148 if test: 

1149 return eval_node(node.body, context) 

1150 else: 

1151 return eval_node(node.orelse, context) 

1152 if isinstance(node, ast.Call): 

1153 func = eval_node(node.func, context) 

1154 if policy.can_call(func): 

1155 args, kwargs = _extract_args_and_kwargs(node, context) 

1156 return func(*args, **kwargs) 

1157 if isclass(func): 

1158 # this code path gets entered when calling class e.g. `MyClass()` 

1159 # or `my_instance.__class__()` - in both cases `func` is `MyClass`. 

1160 # Should return `MyClass` if `__new__` is not overridden, 

1161 # otherwise whatever `__new__` return type is. 

1162 overridden_return_type = _eval_return_type(func.__new__, node, context) 

1163 if overridden_return_type is not NOT_EVALUATED: 

1164 return overridden_return_type 

1165 return _create_duck_for_heap_type(func) 

1166 else: 

1167 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED) 

1168 return_type = _eval_return_type(func, node, context) 

1169 if getattr(func, "__is_async__", False): 

1170 awaited_type = ( 

1171 inferred_return if inferred_return is not None else return_type 

1172 ) 

1173 coroutine_duck = _Duck(attributes=_get_coroutine_attributes()) 

1174 coroutine_duck.__awaited_type__ = awaited_type 

1175 return coroutine_duck 

1176 if inferred_return is not NOT_EVALUATED: 

1177 return inferred_return 

1178 if return_type is not NOT_EVALUATED: 

1179 return return_type 

1180 raise GuardRejection( 

1181 "Call for", 

1182 func, # not joined to avoid calling `repr` 

1183 f"not allowed in {context.evaluation} mode", 

1184 ) 

1185 if isinstance(node, ast.Assert): 

1186 # message is always the second item, so if it is defined user would be completing 

1187 # on the message, not on the assertion test 

1188 if node.msg: 

1189 return eval_node(node.msg, context) 

1190 return eval_node(node.test, context) 

1191 return None 

1192 

1193 

1194def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy): 

1195 """Merge multiple dictionaries, combining values for each key.""" 

1196 if len(dicts) == 1: 

1197 return dicts[0] 

1198 

1199 all_keys = set() 

1200 for d in dicts: 

1201 all_keys.update(d.keys()) 

1202 

1203 merged = {} 

1204 for key in all_keys: 

1205 values = [d[key] for d in dicts if key in d] 

1206 if values: 

1207 merged[key] = _merge_values(values, policy) 

1208 

1209 return merged 

1210 

1211 

1212def _merge_values(values, policy: EvaluationPolicy): 

1213 """Recursively merge multiple values, combining attributes and dict items.""" 

1214 if len(values) == 1: 

1215 return values[0] 

1216 

1217 types = {type(v) for v in values} 

1218 merged_items = None 

1219 key_values = {} 

1220 attributes = set() 

1221 for v in values: 

1222 if policy.can_call(v.__dir__): 

1223 attributes.update(dir(v)) 

1224 try: 

1225 if policy.can_call(v.items): 

1226 try: 

1227 for k, val in v.items(): 

1228 key_values.setdefault(k, []).append(val) 

1229 except Exception as e: 

1230 pass 

1231 elif policy.can_call(v.keys): 

1232 try: 

1233 for k in v.keys(): 

1234 key_values.setdefault(k, []).append(None) 

1235 except Exception as e: 

1236 pass 

1237 except Exception as e: 

1238 pass 

1239 

1240 if key_values: 

1241 merged_items = { 

1242 k: _merge_values(vals, policy) if vals[0] is not None else None 

1243 for k, vals in key_values.items() 

1244 } 

1245 

1246 if len(types) == 1: 

1247 t = next(iter(types)) 

1248 if t not in (dict,) and not ( 

1249 hasattr(next(iter(values)), "__getitem__") 

1250 and ( 

1251 hasattr(next(iter(values)), "items") 

1252 or hasattr(next(iter(values)), "keys") 

1253 ) 

1254 ): 

1255 if t in (list, set, tuple): 

1256 return t 

1257 return values[0] 

1258 

1259 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items) 

1260 

1261 

1262def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext): 

1263 """Infer the return value(s) of a function by evaluating all return statements.""" 

1264 return_values = _collect_return_values(node.body, context) 

1265 

1266 if not return_values: 

1267 return None 

1268 if len(return_values) == 1: 

1269 return return_values[0] 

1270 

1271 policy = get_policy(context) 

1272 return _merge_values(return_values, policy) 

1273 

1274 

1275def _collect_return_values(body, context): 

1276 """Recursively collect return values from a list of AST statements.""" 

1277 return_values = [] 

1278 for stmt in body: 

1279 if isinstance(stmt, ast.Return): 

1280 if stmt.value is None: 

1281 continue 

1282 try: 

1283 value = eval_node(stmt.value, context) 

1284 if value is not None and value is not NOT_EVALUATED: 

1285 return_values.append(value) 

1286 except Exception: 

1287 pass 

1288 if isinstance( 

1289 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda) 

1290 ): 

1291 continue 

1292 elif hasattr(stmt, "body") and isinstance(stmt.body, list): 

1293 return_values.extend(_collect_return_values(stmt.body, context)) 

1294 if isinstance(stmt, ast.Try): 

1295 for h in stmt.handlers: 

1296 if hasattr(h, "body"): 

1297 return_values.extend(_collect_return_values(h.body, context)) 

1298 if hasattr(stmt, "orelse"): 

1299 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1300 if hasattr(stmt, "finalbody"): 

1301 return_values.extend(_collect_return_values(stmt.finalbody, context)) 

1302 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list): 

1303 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1304 return return_values 

1305 

1306 

1307def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): 

1308 """Evaluate return type of a given callable function. 

1309 

1310 Returns the built-in type, a duck or NOT_EVALUATED sentinel. 

1311 """ 

1312 try: 

1313 sig = signature(func) 

1314 except ValueError: 

1315 sig = UNKNOWN_SIGNATURE 

1316 # if annotation was not stringized, or it was stringized 

1317 # but resolved by signature call we know the return type 

1318 not_empty = sig.return_annotation is not Signature.empty 

1319 if not_empty: 

1320 return _resolve_annotation(sig.return_annotation, context, sig, func, node) 

1321 return NOT_EVALUATED 

1322 

1323 

1324def _eval_annotation( 

1325 annotation: str, 

1326 context: EvaluationContext, 

1327): 

1328 return ( 

1329 _eval_node_name(annotation, context) 

1330 if isinstance(annotation, str) 

1331 else annotation 

1332 ) 

1333 

1334 

1335class _GetItemDuck(dict): 

1336 """A dict subclass that always returns the factory instance and claims to have any item.""" 

1337 

1338 def __init__(self, factory, *args, **kwargs): 

1339 super().__init__(*args, **kwargs) 

1340 self._factory = factory 

1341 

1342 def __getitem__(self, key): 

1343 return self._factory() 

1344 

1345 def __contains__(self, key): 

1346 return True 

1347 

1348 

1349def _resolve_annotation( 

1350 annotation: object | str, 

1351 context: EvaluationContext, 

1352 sig: Signature | None = None, 

1353 func: Callable | None = None, 

1354 node: ast.Call | None = None, 

1355): 

1356 """Resolve annotation created by user with `typing` module and custom objects.""" 

1357 if annotation is None: 

1358 return None 

1359 annotation = _eval_annotation(annotation, context) 

1360 origin = get_origin(annotation) 

1361 if annotation is Self and func and hasattr(func, "__self__"): 

1362 return func.__self__ 

1363 elif origin is Literal: 

1364 type_args = get_args(annotation) 

1365 if len(type_args) == 1: 

1366 return type_args[0] 

1367 elif annotation is LiteralString: 

1368 return "" 

1369 elif annotation is AnyStr: 

1370 index = None 

1371 if func and hasattr(func, "__node__"): 

1372 def_node = func.__node__ 

1373 for i, arg in enumerate(def_node.args.args): 

1374 if not arg.annotation: 

1375 continue 

1376 annotation = _eval_annotation(arg.annotation.id, context) 

1377 if annotation is AnyStr: 

1378 index = i 

1379 break 

1380 is_bound_method = ( 

1381 isinstance(func, MethodType) and getattr(func, "__self__") is not None 

1382 ) 

1383 if index and is_bound_method: 

1384 index -= 1 

1385 elif sig: 

1386 for i, (key, value) in enumerate(sig.parameters.items()): 

1387 if value.annotation is AnyStr: 

1388 index = i 

1389 break 

1390 if index is None: 

1391 return None 

1392 if index < 0 or index >= len(node.args): 

1393 return None 

1394 return eval_node(node.args[index], context) 

1395 elif origin is TypeGuard: 

1396 return False 

1397 elif origin is set or origin is list: 

1398 # only one type argument allowed 

1399 attributes = [ 

1400 attr 

1401 for attr in dir( 

1402 _resolve_annotation(get_args(annotation)[0], context, sig, func, node) 

1403 ) 

1404 ] 

1405 duck = _Duck(attributes=dict.fromkeys(attributes)) 

1406 return _Duck( 

1407 attributes=dict.fromkeys(dir(origin())), 

1408 # items are not strrictly needed for set 

1409 items=_GetItemDuck(lambda: duck), 

1410 ) 

1411 elif origin is tuple: 

1412 # multiple type arguments 

1413 return tuple( 

1414 _resolve_annotation(arg, context, sig, func, node) 

1415 for arg in get_args(annotation) 

1416 ) 

1417 elif origin is Union: 

1418 # multiple type arguments 

1419 attributes = [ 

1420 attr 

1421 for type_arg in get_args(annotation) 

1422 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node)) 

1423 ] 

1424 return _Duck(attributes=dict.fromkeys(attributes)) 

1425 elif is_typeddict(annotation): 

1426 return _Duck( 

1427 attributes=dict.fromkeys(dir(dict())), 

1428 items={ 

1429 k: _resolve_annotation(v, context, sig, func, node) 

1430 for k, v in annotation.__annotations__.items() 

1431 }, 

1432 ) 

1433 elif hasattr(annotation, "_is_protocol"): 

1434 return _Duck(attributes=dict.fromkeys(dir(annotation))) 

1435 elif origin is Annotated: 

1436 type_arg = get_args(annotation)[0] 

1437 return _resolve_annotation(type_arg, context, sig, func, node) 

1438 elif isinstance(annotation, NewType): 

1439 return _eval_or_create_duck(annotation.__supertype__, context) 

1440 elif isinstance(annotation, TypeAliasType): 

1441 return _eval_or_create_duck(annotation.__value__, context) 

1442 else: 

1443 return _eval_or_create_duck(annotation, context) 

1444 

1445 

1446def _eval_node_name(node_id: str, context: EvaluationContext): 

1447 policy = get_policy(context) 

1448 if node_id in context.transient_locals: 

1449 return context.transient_locals[node_id] 

1450 if policy.allow_locals_access and node_id in context.locals: 

1451 return context.locals[node_id] 

1452 if policy.allow_globals_access and node_id in context.globals: 

1453 return context.globals[node_id] 

1454 if policy.allow_builtins_access and hasattr(builtins, node_id): 

1455 # note: do not use __builtins__, it is implementation detail of cPython 

1456 return getattr(builtins, node_id) 

1457 if policy.allow_auto_import and context.auto_import: 

1458 return context.auto_import(node_id) 

1459 if not policy.allow_globals_access and not policy.allow_locals_access: 

1460 raise GuardRejection( 

1461 f"Namespace access not allowed in {context.evaluation} mode" 

1462 ) 

1463 else: 

1464 raise NameError(f"{node_id} not found in locals, globals, nor builtins") 

1465 

1466 

1467def _eval_or_create_duck(duck_type, context: EvaluationContext): 

1468 policy = get_policy(context) 

1469 # if allow-listed builtin is on type annotation, instantiate it 

1470 if policy.can_call(duck_type): 

1471 return duck_type() 

1472 # if custom class is in type annotation, mock it 

1473 return _create_duck_for_heap_type(duck_type) 

1474 

1475 

1476def _create_duck_for_heap_type(duck_type): 

1477 """Create an imitation of an object of a given type (a duck). 

1478 

1479 Returns the duck or NOT_EVALUATED sentinel if duck could not be created. 

1480 """ 

1481 duck = ImpersonatingDuck() 

1482 try: 

1483 # this only works for heap types, not builtins 

1484 duck.__class__ = duck_type 

1485 return duck 

1486 except TypeError: 

1487 pass 

1488 return NOT_EVALUATED 

1489 

1490 

1491SUPPORTED_EXTERNAL_GETITEM = { 

1492 ("pandas", "core", "indexing", "_iLocIndexer"), 

1493 ("pandas", "core", "indexing", "_LocIndexer"), 

1494 ("pandas", "DataFrame"), 

1495 ("pandas", "Series"), 

1496 ("numpy", "ndarray"), 

1497 ("numpy", "void"), 

1498} 

1499 

1500 

1501BUILTIN_GETITEM: set[InstancesHaveGetItem] = { 

1502 dict, 

1503 str, # type: ignore[arg-type] 

1504 bytes, # type: ignore[arg-type] 

1505 list, 

1506 tuple, 

1507 type, # for type annotations like list[str] 

1508 _Duck, 

1509 collections.defaultdict, 

1510 collections.deque, 

1511 collections.OrderedDict, 

1512 collections.ChainMap, 

1513 collections.UserDict, 

1514 collections.UserList, 

1515 collections.UserString, # type: ignore[arg-type] 

1516 _DummyNamedTuple, 

1517 _IdentitySubscript, 

1518} 

1519 

1520 

1521def _list_methods(cls, source=None): 

1522 """For use on immutable objects or with methods returning a copy""" 

1523 return [getattr(cls, k) for k in (source if source else dir(cls))] 

1524 

1525 

1526dict_non_mutating_methods = ("copy", "keys", "values", "items") 

1527list_non_mutating_methods = ("copy", "index", "count") 

1528set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) 

1529 

1530 

1531dict_keys: type[collections.abc.KeysView] = type({}.keys()) 

1532dict_values: type = type({}.values()) 

1533dict_items: type = type({}.items()) 

1534 

1535NUMERICS = {int, float, complex} 

1536 

1537ALLOWED_CALLS = { 

1538 bytes, 

1539 *_list_methods(bytes), 

1540 bytes.__iter__, 

1541 dict, 

1542 *_list_methods(dict, dict_non_mutating_methods), 

1543 dict.__iter__, 

1544 dict_keys.__iter__, 

1545 dict_values.__iter__, 

1546 dict_items.__iter__, 

1547 dict_keys.isdisjoint, 

1548 list, 

1549 *_list_methods(list, list_non_mutating_methods), 

1550 list.__iter__, 

1551 set, 

1552 *_list_methods(set, set_non_mutating_methods), 

1553 set.__iter__, 

1554 frozenset, 

1555 *_list_methods(frozenset), 

1556 frozenset.__iter__, 

1557 range, 

1558 range.__iter__, 

1559 str, 

1560 *_list_methods(str), 

1561 str.__iter__, 

1562 tuple, 

1563 *_list_methods(tuple), 

1564 tuple.__iter__, 

1565 bool, 

1566 *_list_methods(bool), 

1567 *NUMERICS, 

1568 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], 

1569 collections.deque, 

1570 *_list_methods(collections.deque, list_non_mutating_methods), 

1571 collections.deque.__iter__, 

1572 collections.defaultdict, 

1573 *_list_methods(collections.defaultdict, dict_non_mutating_methods), 

1574 collections.defaultdict.__iter__, 

1575 collections.OrderedDict, 

1576 *_list_methods(collections.OrderedDict, dict_non_mutating_methods), 

1577 collections.OrderedDict.__iter__, 

1578 collections.UserDict, 

1579 *_list_methods(collections.UserDict, dict_non_mutating_methods), 

1580 collections.UserDict.__iter__, 

1581 collections.UserList, 

1582 *_list_methods(collections.UserList, list_non_mutating_methods), 

1583 collections.UserList.__iter__, 

1584 collections.UserString, 

1585 *_list_methods(collections.UserString, dir(str)), 

1586 collections.UserString.__iter__, 

1587 collections.Counter, 

1588 *_list_methods(collections.Counter, dict_non_mutating_methods), 

1589 collections.Counter.__iter__, 

1590 collections.Counter.elements, 

1591 collections.Counter.most_common, 

1592 object.__dir__, 

1593 type.__dir__, 

1594 _Duck.__dir__, 

1595} 

1596 

1597BUILTIN_GETATTR: set[MayHaveGetattr] = { 

1598 *BUILTIN_GETITEM, 

1599 set, 

1600 frozenset, 

1601 object, 

1602 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. 

1603 *NUMERICS, 

1604 dict_keys, 

1605 MethodDescriptorType, 

1606 ModuleType, 

1607} 

1608 

1609 

1610BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} 

1611 

1612EVALUATION_POLICIES = { 

1613 "minimal": EvaluationPolicy( 

1614 allow_builtins_access=True, 

1615 allow_locals_access=False, 

1616 allow_globals_access=False, 

1617 allow_item_access=False, 

1618 allow_attr_access=False, 

1619 allowed_calls=set(), 

1620 allow_any_calls=False, 

1621 allow_all_operations=False, 

1622 ), 

1623 "limited": SelectivePolicy( 

1624 allowed_getitem=BUILTIN_GETITEM, 

1625 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, 

1626 allowed_getattr=BUILTIN_GETATTR, 

1627 allowed_getattr_external={ 

1628 # pandas Series/Frame implements custom `__getattr__` 

1629 ("pandas", "DataFrame"), 

1630 ("pandas", "Series"), 

1631 }, 

1632 allowed_operations=BUILTIN_OPERATIONS, 

1633 allow_builtins_access=True, 

1634 allow_locals_access=True, 

1635 allow_globals_access=True, 

1636 allow_getitem_on_types=True, 

1637 allowed_calls=ALLOWED_CALLS, 

1638 ), 

1639 "unsafe": EvaluationPolicy( 

1640 allow_builtins_access=True, 

1641 allow_locals_access=True, 

1642 allow_globals_access=True, 

1643 allow_attr_access=True, 

1644 allow_item_access=True, 

1645 allow_any_calls=True, 

1646 allow_all_operations=True, 

1647 ), 

1648} 

1649 

1650 

1651__all__ = [ 

1652 "guarded_eval", 

1653 "eval_node", 

1654 "GuardRejection", 

1655 "EvaluationContext", 

1656 "_unbind_method", 

1657]