Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/guarded_eval.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

821 statements  

1from copy import copy 

2from inspect import isclass, signature, Signature, getmodule 

3from typing import ( 

4 Annotated, 

5 AnyStr, 

6 Callable, 

7 Literal, 

8 NamedTuple, 

9 NewType, 

10 Optional, 

11 Protocol, 

12 Sequence, 

13 TypeGuard, 

14 Union, 

15 get_args, 

16 get_origin, 

17 is_typeddict, 

18) 

19import ast 

20import builtins 

21import collections 

22import dataclasses 

23import operator 

24import sys 

25import typing 

26import warnings 

27from functools import cached_property 

28from dataclasses import dataclass, field 

29from types import MethodDescriptorType, ModuleType, MethodType 

30 

31from IPython.utils.decorators import undoc 

32 

33 

34from typing import Self, LiteralString, get_type_hints 

35 

36if sys.version_info < (3, 12): 

37 from typing_extensions import TypeAliasType 

38else: 

39 from typing import TypeAliasType 

40 

41 

42@undoc 

43class HasGetItem(Protocol): 

44 def __getitem__(self, key) -> None: 

45 ... 

46 

47 

48@undoc 

49class InstancesHaveGetItem(Protocol): 

50 def __call__(self, *args, **kwargs) -> HasGetItem: 

51 ... 

52 

53 

54@undoc 

55class HasGetAttr(Protocol): 

56 def __getattr__(self, key) -> None: 

57 ... 

58 

59 

60@undoc 

61class DoesNotHaveGetAttr(Protocol): 

62 pass 

63 

64 

65# By default `__getattr__` is not explicitly implemented on most objects 

66MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] 

67 

68 

69def _unbind_method(func: Callable) -> Union[Callable, None]: 

70 """Get unbound method for given bound method. 

71 

72 Returns None if cannot get unbound method, or method is already unbound. 

73 """ 

74 owner = getattr(func, "__self__", None) 

75 owner_class = type(owner) 

76 name = getattr(func, "__name__", None) 

77 instance_dict_overrides = getattr(owner, "__dict__", None) 

78 if ( 

79 owner is not None 

80 and name 

81 and ( 

82 not instance_dict_overrides 

83 or (instance_dict_overrides and name not in instance_dict_overrides) 

84 ) 

85 ): 

86 return getattr(owner_class, name) 

87 return None 

88 

89 

90@undoc 

91@dataclass 

92class EvaluationPolicy: 

93 """Definition of evaluation policy.""" 

94 

95 allow_locals_access: bool = False 

96 allow_globals_access: bool = False 

97 allow_item_access: bool = False 

98 allow_attr_access: bool = False 

99 allow_builtins_access: bool = False 

100 allow_all_operations: bool = False 

101 allow_any_calls: bool = False 

102 allow_auto_import: bool = False 

103 allowed_calls: set[Callable] = field(default_factory=set) 

104 

105 def can_get_item(self, value, item): 

106 return self.allow_item_access 

107 

108 def can_get_attr(self, value, attr): 

109 return self.allow_attr_access 

110 

111 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

112 if self.allow_all_operations: 

113 return True 

114 

115 def can_call(self, func): 

116 if self.allow_any_calls: 

117 return True 

118 

119 if func in self.allowed_calls: 

120 return True 

121 

122 owner_method = _unbind_method(func) 

123 

124 if owner_method and owner_method in self.allowed_calls: 

125 return True 

126 

127 

128def _get_external(module_name: str, access_path: Sequence[str]): 

129 """Get value from external module given a dotted access path. 

130 

131 Only gets value if the module is already imported. 

132 

133 Raises: 

134 * `KeyError` if module is removed not found, and 

135 * `AttributeError` if access path does not match an exported object 

136 """ 

137 try: 

138 member_type = sys.modules[module_name] 

139 # standard module 

140 for attr in access_path: 

141 member_type = getattr(member_type, attr) 

142 return member_type 

143 except (KeyError, AttributeError): 

144 # handle modules in namespace packages 

145 module_path = ".".join([module_name, *access_path]) 

146 if module_path in sys.modules: 

147 return sys.modules[module_path] 

148 raise 

149 

150 

151def _has_original_dunder_external( 

152 value, 

153 module_name: str, 

154 access_path: Sequence[str], 

155 method_name: str, 

156): 

157 if module_name not in sys.modules: 

158 full_module_path = ".".join([module_name, *access_path]) 

159 if full_module_path not in sys.modules: 

160 # LBYLB as it is faster 

161 return False 

162 try: 

163 member_type = _get_external(module_name, access_path) 

164 value_type = type(value) 

165 if type(value) == member_type: 

166 return True 

167 if isinstance(member_type, ModuleType): 

168 value_module = getmodule(value_type) 

169 if not value_module or not value_module.__name__: 

170 return False 

171 if ( 

172 value_module.__name__ == member_type.__name__ 

173 or value_module.__name__.startswith(member_type.__name__ + ".") 

174 ): 

175 return True 

176 if method_name == "__getattribute__": 

177 # we have to short-circuit here due to an unresolved issue in 

178 # `isinstance` implementation: https://bugs.python.org/issue32683 

179 return False 

180 if not isinstance(member_type, ModuleType) and isinstance(value, member_type): 

181 method = getattr(value_type, method_name, None) 

182 member_method = getattr(member_type, method_name, None) 

183 if member_method == method: 

184 return True 

185 if isinstance(member_type, ModuleType): 

186 method = getattr(value_type, method_name, None) 

187 for base_class in value_type.__mro__[1:]: 

188 base_module = getmodule(base_class) 

189 if base_module and ( 

190 base_module.__name__ == member_type.__name__ 

191 or base_module.__name__.startswith(member_type.__name__ + ".") 

192 ): 

193 # Check if the method comes from this trusted base class 

194 base_method = getattr(base_class, method_name, None) 

195 if base_method is not None and base_method == method: 

196 return True 

197 except (AttributeError, KeyError): 

198 return False 

199 

200 

201def _has_original_dunder( 

202 value, allowed_types, allowed_methods, allowed_external, method_name 

203): 

204 # note: Python ignores `__getattr__`/`__getitem__` on instances, 

205 # we only need to check at class level 

206 value_type = type(value) 

207 

208 # strict type check passes → no need to check method 

209 if value_type in allowed_types: 

210 return True 

211 

212 method = getattr(value_type, method_name, None) 

213 

214 if method is None: 

215 return None 

216 

217 if method in allowed_methods: 

218 return True 

219 

220 for module_name, *access_path in allowed_external: 

221 if _has_original_dunder_external(value, module_name, access_path, method_name): 

222 return True 

223 

224 return False 

225 

226 

227def _coerce_path_to_tuples( 

228 allow_list: set[tuple[str, ...] | str], 

229) -> set[tuple[str, ...]]: 

230 """Replace dotted paths on the provided allow-list with tuples.""" 

231 return { 

232 path if isinstance(path, tuple) else tuple(path.split(".")) 

233 for path in allow_list 

234 } 

235 

236 

237@undoc 

238@dataclass 

239class SelectivePolicy(EvaluationPolicy): 

240 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set) 

241 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set) 

242 

243 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set) 

244 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set) 

245 

246 allowed_operations: set = field(default_factory=set) 

247 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set) 

248 

249 allow_getitem_on_types: bool = field(default_factory=bool) 

250 

251 _operation_methods_cache: dict[str, set[Callable]] = field( 

252 default_factory=dict, init=False 

253 ) 

254 

255 def can_get_attr(self, value, attr): 

256 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external) 

257 

258 has_original_attribute = _has_original_dunder( 

259 value, 

260 allowed_types=self.allowed_getattr, 

261 allowed_methods=self._getattribute_methods, 

262 allowed_external=allowed_getattr_external, 

263 method_name="__getattribute__", 

264 ) 

265 has_original_attr = _has_original_dunder( 

266 value, 

267 allowed_types=self.allowed_getattr, 

268 allowed_methods=self._getattr_methods, 

269 allowed_external=allowed_getattr_external, 

270 method_name="__getattr__", 

271 ) 

272 

273 accept = False 

274 

275 # Many objects do not have `__getattr__`, this is fine. 

276 if has_original_attr is None and has_original_attribute: 

277 accept = True 

278 else: 

279 # Accept objects without modifications to `__getattr__` and `__getattribute__` 

280 accept = has_original_attr and has_original_attribute 

281 

282 if accept: 

283 # We still need to check for overridden properties. 

284 

285 value_class = type(value) 

286 if not hasattr(value_class, attr): 

287 return True 

288 

289 class_attr_val = getattr(value_class, attr) 

290 is_property = isinstance(class_attr_val, property) 

291 

292 if not is_property: 

293 return True 

294 

295 # Properties in allowed types are ok (although we do not include any 

296 # properties in our default allow list currently). 

297 if type(value) in self.allowed_getattr: 

298 return True # pragma: no cover 

299 

300 # Properties in subclasses of allowed types may be ok if not changed 

301 for module_name, *access_path in allowed_getattr_external: 

302 try: 

303 external_class = _get_external(module_name, access_path) 

304 external_class_attr_val = getattr(external_class, attr) 

305 except (KeyError, AttributeError): 

306 return False # pragma: no cover 

307 return class_attr_val == external_class_attr_val 

308 

309 return False 

310 

311 def can_get_item(self, value, item): 

312 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" 

313 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external) 

314 if self.allow_getitem_on_types: 

315 # e.g. Union[str, int] or Literal[True, 1] 

316 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)): 

317 return True 

318 # PEP 560 e.g. list[str] 

319 if isinstance(value, type) and hasattr(value, "__class_getitem__"): 

320 return True 

321 return _has_original_dunder( 

322 value, 

323 allowed_types=self.allowed_getitem, 

324 allowed_methods=self._getitem_methods, 

325 allowed_external=allowed_getitem_external, 

326 method_name="__getitem__", 

327 ) 

328 

329 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

330 allowed_operations_external = _coerce_path_to_tuples( 

331 self.allowed_operations_external 

332 ) 

333 objects = [a] 

334 if b is not None: 

335 objects.append(b) 

336 return all( 

337 [ 

338 _has_original_dunder( 

339 obj, 

340 allowed_types=self.allowed_operations, 

341 allowed_methods=self._operator_dunder_methods(dunder), 

342 allowed_external=allowed_operations_external, 

343 method_name=dunder, 

344 ) 

345 for dunder in dunders 

346 for obj in objects 

347 ] 

348 ) 

349 

350 def _operator_dunder_methods(self, dunder: str) -> set[Callable]: 

351 if dunder not in self._operation_methods_cache: 

352 self._operation_methods_cache[dunder] = self._safe_get_methods( 

353 self.allowed_operations, dunder 

354 ) 

355 return self._operation_methods_cache[dunder] 

356 

357 @cached_property 

358 def _getitem_methods(self) -> set[Callable]: 

359 return self._safe_get_methods(self.allowed_getitem, "__getitem__") 

360 

361 @cached_property 

362 def _getattr_methods(self) -> set[Callable]: 

363 return self._safe_get_methods(self.allowed_getattr, "__getattr__") 

364 

365 @cached_property 

366 def _getattribute_methods(self) -> set[Callable]: 

367 return self._safe_get_methods(self.allowed_getattr, "__getattribute__") 

368 

369 def _safe_get_methods(self, classes, name) -> set[Callable]: 

370 return { 

371 method 

372 for class_ in classes 

373 for method in [getattr(class_, name, None)] 

374 if method 

375 } 

376 

377 

378class _DummyNamedTuple(NamedTuple): 

379 """Used internally to retrieve methods of named tuple instance.""" 

380 

381 

382EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] 

383 

384 

385@dataclass 

386class EvaluationContext: 

387 #: Local namespace 

388 locals: dict 

389 #: Global namespace 

390 globals: dict 

391 #: Evaluation policy identifier 

392 evaluation: EvaluationPolicyName = "forbidden" 

393 #: Whether the evaluation of code takes place inside of a subscript. 

394 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. 

395 in_subscript: bool = False 

396 #: Auto import method 

397 auto_import: Callable[list[str], ModuleType] | None = None 

398 #: Overrides for evaluation policy 

399 policy_overrides: dict = field(default_factory=dict) 

400 #: Transient local namespace used to store mocks 

401 transient_locals: dict = field(default_factory=dict) 

402 #: Transients of class level 

403 class_transients: dict | None = None 

404 #: Instance variable name used in the method definition 

405 instance_arg_name: str | None = None 

406 

407 def replace(self, /, **changes): 

408 """Return a new copy of the context, with specified changes""" 

409 return dataclasses.replace(self, **changes) 

410 

411 

412class _IdentitySubscript: 

413 """Returns the key itself when item is requested via subscript.""" 

414 

415 def __getitem__(self, key): 

416 return key 

417 

418 

419IDENTITY_SUBSCRIPT = _IdentitySubscript() 

420SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" 

421UNKNOWN_SIGNATURE = Signature() 

422NOT_EVALUATED = object() 

423 

424 

425class GuardRejection(Exception): 

426 """Exception raised when guard rejects evaluation attempt.""" 

427 

428 pass 

429 

430 

431def guarded_eval(code: str, context: EvaluationContext): 

432 """Evaluate provided code in the evaluation context. 

433 

434 If evaluation policy given by context is set to ``forbidden`` 

435 no evaluation will be performed; if it is set to ``dangerous`` 

436 standard :func:`eval` will be used; finally, for any other, 

437 policy :func:`eval_node` will be called on parsed AST. 

438 """ 

439 locals_ = context.locals 

440 

441 if context.evaluation == "forbidden": 

442 raise GuardRejection("Forbidden mode") 

443 

444 # note: not using `ast.literal_eval` as it does not implement 

445 # getitem at all, for example it fails on simple `[0][1]` 

446 

447 if context.in_subscript: 

448 # syntactic sugar for ellipsis (:) is only available in subscripts 

449 # so we need to trick the ast parser into thinking that we have 

450 # a subscript, but we need to be able to later recognise that we did 

451 # it so we can ignore the actual __getitem__ operation 

452 if not code: 

453 return tuple() 

454 locals_ = locals_.copy() 

455 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT 

456 code = SUBSCRIPT_MARKER + "[" + code + "]" 

457 context = context.replace(locals=locals_) 

458 

459 if context.evaluation == "dangerous": 

460 return eval(code, context.globals, context.locals) 

461 

462 node = ast.parse(code, mode="exec") 

463 

464 return eval_node(node, context) 

465 

466 

467BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = { 

468 ast.Add: ("__add__",), 

469 ast.Sub: ("__sub__",), 

470 ast.Mult: ("__mul__",), 

471 ast.Div: ("__truediv__",), 

472 ast.FloorDiv: ("__floordiv__",), 

473 ast.Mod: ("__mod__",), 

474 ast.Pow: ("__pow__",), 

475 ast.LShift: ("__lshift__",), 

476 ast.RShift: ("__rshift__",), 

477 ast.BitOr: ("__or__",), 

478 ast.BitXor: ("__xor__",), 

479 ast.BitAnd: ("__and__",), 

480 ast.MatMult: ("__matmul__",), 

481} 

482 

483COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = { 

484 ast.Eq: ("__eq__",), 

485 ast.NotEq: ("__ne__", "__eq__"), 

486 ast.Lt: ("__lt__", "__gt__"), 

487 ast.LtE: ("__le__", "__ge__"), 

488 ast.Gt: ("__gt__", "__lt__"), 

489 ast.GtE: ("__ge__", "__le__"), 

490 ast.In: ("__contains__",), 

491 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially 

492} 

493 

494UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = { 

495 ast.USub: ("__neg__",), 

496 ast.UAdd: ("__pos__",), 

497 # we have to check both __inv__ and __invert__! 

498 ast.Invert: ("__invert__", "__inv__"), 

499 ast.Not: ("__not__",), 

500} 

501 

502 

503class ImpersonatingDuck: 

504 """A dummy class used to create objects of other classes without calling their ``__init__``""" 

505 

506 # no-op: override __class__ to impersonate 

507 

508 

509class _Duck: 

510 """A dummy class used to create objects pretending to have given attributes""" 

511 

512 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): 

513 self.attributes = attributes if attributes is not None else {} 

514 self.items = items if items is not None else {} 

515 

516 def __getattr__(self, attr: str): 

517 return self.attributes[attr] 

518 

519 def __hasattr__(self, attr: str): 

520 return attr in self.attributes 

521 

522 def __dir__(self): 

523 return [*dir(super), *self.attributes] 

524 

525 def __getitem__(self, key: str): 

526 return self.items[key] 

527 

528 def __hasitem__(self, key: str): 

529 return self.items[key] 

530 

531 def _ipython_key_completions_(self): 

532 return self.items.keys() 

533 

534 

535def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]: 

536 dunder = None 

537 for op, candidate_dunder in dunders.items(): 

538 if isinstance(node_op, op): 

539 dunder = candidate_dunder 

540 return dunder 

541 

542 

543def get_policy(context: EvaluationContext) -> EvaluationPolicy: 

544 policy = copy(EVALUATION_POLICIES[context.evaluation]) 

545 

546 for key, value in context.policy_overrides.items(): 

547 if hasattr(policy, key): 

548 setattr(policy, key, value) 

549 return policy 

550 

551 

552def _validate_policy_overrides( 

553 policy_name: EvaluationPolicyName, policy_overrides: dict 

554) -> bool: 

555 policy = EVALUATION_POLICIES[policy_name] 

556 

557 all_good = True 

558 for key, value in policy_overrides.items(): 

559 if not hasattr(policy, key): 

560 warnings.warn( 

561 f"Override {key!r} is not valid with {policy_name!r} evaluation policy" 

562 ) 

563 all_good = False 

564 return all_good 

565 

566 

567def _handle_assign(node: ast.Assign, context: EvaluationContext): 

568 value = eval_node(node.value, context) 

569 transient_locals = context.transient_locals 

570 policy = get_policy(context) 

571 class_transients = context.class_transients 

572 for target in node.targets: 

573 if isinstance(target, (ast.Tuple, ast.List)): 

574 # Handle unpacking assignment 

575 values = list(value) 

576 targets = target.elts 

577 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)] 

578 

579 # Unified handling: treat no starred as starred at end 

580 star_or_last_idx = starred[0] if starred else len(targets) 

581 

582 # Before starred 

583 for i in range(star_or_last_idx): 

584 # Check for self.x assignment 

585 if _is_instance_attribute_assignment(targets[i], context): 

586 class_transients[targets[i].attr] = values[i] 

587 else: 

588 transient_locals[targets[i].id] = values[i] 

589 

590 # Starred if exists 

591 if starred: 

592 end = len(values) - (len(targets) - star_or_last_idx - 1) 

593 if _is_instance_attribute_assignment( 

594 targets[star_or_last_idx], context 

595 ): 

596 class_transients[targets[star_or_last_idx].attr] = values[ 

597 star_or_last_idx:end 

598 ] 

599 else: 

600 transient_locals[targets[star_or_last_idx].value.id] = values[ 

601 star_or_last_idx:end 

602 ] 

603 

604 # After starred 

605 for i in range(star_or_last_idx + 1, len(targets)): 

606 if _is_instance_attribute_assignment(targets[i], context): 

607 class_transients[targets[i].attr] = values[ 

608 len(values) - (len(targets) - i) 

609 ] 

610 else: 

611 transient_locals[targets[i].id] = values[ 

612 len(values) - (len(targets) - i) 

613 ] 

614 elif isinstance(target, ast.Subscript): 

615 if isinstance(target.value, ast.Name): 

616 name = target.value.id 

617 container = transient_locals.get(name) 

618 if container is None: 

619 container = context.locals.get(name) 

620 if container is None: 

621 container = context.globals.get(name) 

622 if container is None: 

623 raise NameError( 

624 f"{name} not found in locals, globals, nor builtins" 

625 ) 

626 storage_dict = transient_locals 

627 storage_key = name 

628 elif isinstance( 

629 target.value, ast.Attribute 

630 ) and _is_instance_attribute_assignment(target.value, context): 

631 attr = target.value.attr 

632 container = class_transients.get(attr, None) 

633 if container is None: 

634 raise NameError(f"{attr} not found in class transients") 

635 storage_dict = class_transients 

636 storage_key = attr 

637 else: 

638 return 

639 

640 key = eval_node(target.slice, context) 

641 attributes = ( 

642 dict.fromkeys(dir(container)) 

643 if policy.can_call(container.__dir__) 

644 else {} 

645 ) 

646 items = {} 

647 

648 if policy.can_get_item(container, None): 

649 try: 

650 items = dict(container.items()) 

651 except Exception: 

652 pass 

653 

654 items[key] = value 

655 duck_container = _Duck(attributes=attributes, items=items) 

656 storage_dict[storage_key] = duck_container 

657 elif _is_instance_attribute_assignment(target, context): 

658 class_transients[target.attr] = value 

659 else: 

660 transient_locals[target.id] = value 

661 return None 

662 

663 

664def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext): 

665 args = [eval_node(arg, context) for arg in node.args] 

666 kwargs = { 

667 k: v 

668 for kw in node.keywords 

669 for k, v in ( 

670 {kw.arg: eval_node(kw.value, context)} 

671 if kw.arg 

672 else eval_node(kw.value, context) 

673 ).items() 

674 } 

675 return args, kwargs 

676 

677 

678def _is_instance_attribute_assignment( 

679 target: ast.AST, context: EvaluationContext 

680) -> bool: 

681 """Return True if target is an attribute access on the instance argument.""" 

682 return ( 

683 context.class_transients is not None 

684 and context.instance_arg_name is not None 

685 and isinstance(target, ast.Attribute) 

686 and isinstance(getattr(target, "value", None), ast.Name) 

687 and getattr(target.value, "id", None) == context.instance_arg_name 

688 ) 

689 

690 

691def _get_coroutine_attributes() -> dict[str, Optional[object]]: 

692 async def _dummy(): 

693 return None 

694 

695 coro = _dummy() 

696 try: 

697 return {attr: getattr(coro, attr, None) for attr in dir(coro)} 

698 finally: 

699 coro.close() 

700 

701 

702def eval_node(node: Union[ast.AST, None], context: EvaluationContext): 

703 """Evaluate AST node in provided context. 

704 

705 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. 

706 

707 Does not evaluate actions that always have side effects: 

708 

709 - class definitions (``class sth: ...``) 

710 - function definitions (``def sth: ...``) 

711 - variable assignments (``x = 1``) 

712 - augmented assignments (``x += 1``) 

713 - deletions (``del x``) 

714 

715 Does not evaluate operations which do not return values: 

716 

717 - assertions (``assert x``) 

718 - pass (``pass``) 

719 - imports (``import x``) 

720 - control flow: 

721 

722 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``) 

723 - loops (``for`` and ``while``) 

724 - exception handling 

725 

726 The purpose of this function is to guard against unwanted side-effects; 

727 it does not give guarantees on protection from malicious code execution. 

728 """ 

729 policy = get_policy(context) 

730 

731 if node is None: 

732 return None 

733 if isinstance(node, (ast.Interactive, ast.Module)): 

734 result = None 

735 for child_node in node.body: 

736 result = eval_node(child_node, context) 

737 return result 

738 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

739 is_async = isinstance(node, ast.AsyncFunctionDef) 

740 func_locals = context.transient_locals.copy() 

741 func_context = context.replace(transient_locals=func_locals) 

742 is_property = False 

743 is_static = False 

744 is_classmethod = False 

745 for decorator_node in node.decorator_list: 

746 try: 

747 decorator = eval_node(decorator_node, context) 

748 except NameError: 

749 # if the decorator is not yet defined this is fine 

750 # especialy because we don't handle imports yet 

751 continue 

752 if decorator is property: 

753 is_property = True 

754 elif decorator is staticmethod: 

755 is_static = True 

756 elif decorator is classmethod: 

757 is_classmethod = True 

758 

759 if func_context.class_transients is not None: 

760 if not is_static and not is_classmethod: 

761 func_context.instance_arg_name = ( 

762 node.args.args[0].arg if node.args.args else None 

763 ) 

764 

765 return_type = eval_node(node.returns, context=context) 

766 

767 for child_node in node.body: 

768 eval_node(child_node, func_context) 

769 

770 if is_property: 

771 if return_type is not None: 

772 context.transient_locals[node.name] = _resolve_annotation( 

773 return_type, context 

774 ) 

775 else: 

776 return_value = _infer_return_value(node, func_context) 

777 context.transient_locals[node.name] = return_value 

778 

779 return None 

780 

781 def dummy_function(*args, **kwargs): 

782 pass 

783 

784 if return_type is not None: 

785 dummy_function.__annotations__["return"] = return_type 

786 else: 

787 inferred_return = _infer_return_value(node, func_context) 

788 if inferred_return is not None: 

789 dummy_function.__inferred_return__ = inferred_return 

790 

791 dummy_function.__name__ = node.name 

792 dummy_function.__node__ = node 

793 dummy_function.__is_async__ = is_async 

794 context.transient_locals[node.name] = dummy_function 

795 return None 

796 if isinstance(node, ast.Lambda): 

797 

798 def dummy_function(*args, **kwargs): 

799 pass 

800 

801 dummy_function.__inferred_return__ = eval_node(node.body, context) 

802 return dummy_function 

803 if isinstance(node, ast.ClassDef): 

804 # TODO support class decorators? 

805 class_locals = {} 

806 outer_locals = context.locals.copy() 

807 outer_locals.update(context.transient_locals) 

808 class_context = context.replace( 

809 transient_locals=class_locals, locals=outer_locals 

810 ) 

811 class_context.class_transients = class_locals 

812 for child_node in node.body: 

813 eval_node(child_node, class_context) 

814 bases = tuple([eval_node(base, context) for base in node.bases]) 

815 dummy_class = type(node.name, bases, class_locals) 

816 context.transient_locals[node.name] = dummy_class 

817 return None 

818 if isinstance(node, ast.Await): 

819 value = eval_node(node.value, context) 

820 if hasattr(value, "__awaited_type__"): 

821 return value.__awaited_type__ 

822 return value 

823 if isinstance(node, ast.While): 

824 loop_locals = context.transient_locals.copy() 

825 loop_context = context.replace(transient_locals=loop_locals) 

826 

827 result = None 

828 for stmt in node.body: 

829 result = eval_node(stmt, loop_context) 

830 

831 policy = get_policy(context) 

832 merged_locals = _merge_dicts_by_key( 

833 [loop_locals, context.transient_locals.copy()], policy 

834 ) 

835 context.transient_locals.update(merged_locals) 

836 

837 return result 

838 if isinstance(node, ast.For): 

839 try: 

840 iterable = eval_node(node.iter, context) 

841 except Exception: 

842 iterable = None 

843 

844 sample = None 

845 if iterable is not None: 

846 try: 

847 if policy.can_call(getattr(iterable, "__iter__", None)): 

848 sample = next(iter(iterable)) 

849 except Exception: 

850 sample = None 

851 

852 loop_locals = context.transient_locals.copy() 

853 loop_context = context.replace(transient_locals=loop_locals) 

854 

855 if sample is not None: 

856 try: 

857 fake_assign = ast.Assign( 

858 targets=[node.target], value=ast.Constant(value=sample) 

859 ) 

860 _handle_assign(fake_assign, loop_context) 

861 except Exception: 

862 pass 

863 

864 result = None 

865 for stmt in node.body: 

866 result = eval_node(stmt, loop_context) 

867 

868 policy = get_policy(context) 

869 merged_locals = _merge_dicts_by_key( 

870 [loop_locals, context.transient_locals.copy()], policy 

871 ) 

872 context.transient_locals.update(merged_locals) 

873 

874 return result 

875 if isinstance(node, ast.If): 

876 branches = [] 

877 current = node 

878 result = None 

879 while True: 

880 branch_locals = context.transient_locals.copy() 

881 branch_context = context.replace(transient_locals=branch_locals) 

882 for stmt in current.body: 

883 result = eval_node(stmt, branch_context) 

884 branches.append(branch_locals) 

885 if not current.orelse: 

886 break 

887 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If): 

888 # It's an elif - continue loop 

889 current = current.orelse[0] 

890 else: 

891 # It's an else block - process and break 

892 else_locals = context.transient_locals.copy() 

893 else_context = context.replace(transient_locals=else_locals) 

894 for stmt in current.orelse: 

895 result = eval_node(stmt, else_context) 

896 branches.append(else_locals) 

897 break 

898 branches.append(context.transient_locals.copy()) 

899 policy = get_policy(context) 

900 merged_locals = _merge_dicts_by_key(branches, policy) 

901 context.transient_locals.update(merged_locals) 

902 return result 

903 if isinstance(node, ast.Assign): 

904 return _handle_assign(node, context) 

905 if isinstance(node, ast.AnnAssign): 

906 if node.simple: 

907 value = _resolve_annotation(eval_node(node.annotation, context), context) 

908 context.transient_locals[node.target.id] = value 

909 # Handle non-simple annotated assignments only for self.x: type = value 

910 if _is_instance_attribute_assignment(node.target, context): 

911 value = _resolve_annotation(eval_node(node.annotation, context), context) 

912 context.class_transients[node.target.attr] = value 

913 return None 

914 if isinstance(node, ast.Expression): 

915 return eval_node(node.body, context) 

916 if isinstance(node, ast.Expr): 

917 return eval_node(node.value, context) 

918 if isinstance(node, ast.Pass): 

919 return None 

920 if isinstance(node, ast.Import): 

921 # TODO: populate transient_locals 

922 return None 

923 if isinstance(node, (ast.AugAssign, ast.Delete)): 

924 return None 

925 if isinstance(node, (ast.Global, ast.Nonlocal)): 

926 return None 

927 if isinstance(node, ast.BinOp): 

928 left = eval_node(node.left, context) 

929 right = eval_node(node.right, context) 

930 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) 

931 if dunders: 

932 if policy.can_operate(dunders, left, right): 

933 return getattr(left, dunders[0])(right) 

934 else: 

935 raise GuardRejection( 

936 f"Operation (`{dunders}`) for", 

937 type(left), 

938 f"not allowed in {context.evaluation} mode", 

939 ) 

940 if isinstance(node, ast.Compare): 

941 left = eval_node(node.left, context) 

942 all_true = True 

943 negate = False 

944 for op, right in zip(node.ops, node.comparators): 

945 right = eval_node(right, context) 

946 dunder = None 

947 dunders = _find_dunder(op, COMP_OP_DUNDERS) 

948 if not dunders: 

949 if isinstance(op, ast.NotIn): 

950 dunders = COMP_OP_DUNDERS[ast.In] 

951 negate = True 

952 if isinstance(op, ast.Is): 

953 dunder = "is_" 

954 if isinstance(op, ast.IsNot): 

955 dunder = "is_" 

956 negate = True 

957 if not dunder and dunders: 

958 dunder = dunders[0] 

959 if dunder: 

960 a, b = (right, left) if dunder == "__contains__" else (left, right) 

961 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): 

962 result = getattr(operator, dunder)(a, b) 

963 if negate: 

964 result = not result 

965 if not result: 

966 all_true = False 

967 left = right 

968 else: 

969 raise GuardRejection( 

970 f"Comparison (`{dunder}`) for", 

971 type(left), 

972 f"not allowed in {context.evaluation} mode", 

973 ) 

974 else: 

975 raise ValueError( 

976 f"Comparison `{dunder}` not supported" 

977 ) # pragma: no cover 

978 return all_true 

979 if isinstance(node, ast.Constant): 

980 return node.value 

981 if isinstance(node, ast.Tuple): 

982 return tuple(eval_node(e, context) for e in node.elts) 

983 if isinstance(node, ast.List): 

984 return [eval_node(e, context) for e in node.elts] 

985 if isinstance(node, ast.Set): 

986 return {eval_node(e, context) for e in node.elts} 

987 if isinstance(node, ast.Dict): 

988 return dict( 

989 zip( 

990 [eval_node(k, context) for k in node.keys], 

991 [eval_node(v, context) for v in node.values], 

992 ) 

993 ) 

994 if isinstance(node, ast.Slice): 

995 return slice( 

996 eval_node(node.lower, context), 

997 eval_node(node.upper, context), 

998 eval_node(node.step, context), 

999 ) 

1000 if isinstance(node, ast.UnaryOp): 

1001 value = eval_node(node.operand, context) 

1002 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) 

1003 if dunders: 

1004 if policy.can_operate(dunders, value): 

1005 try: 

1006 return getattr(value, dunders[0])() 

1007 except AttributeError: 

1008 raise TypeError( 

1009 f"bad operand type for unary {node.op}: {type(value)}" 

1010 ) 

1011 else: 

1012 raise GuardRejection( 

1013 f"Operation (`{dunders}`) for", 

1014 type(value), 

1015 f"not allowed in {context.evaluation} mode", 

1016 ) 

1017 if isinstance(node, ast.Subscript): 

1018 value = eval_node(node.value, context) 

1019 slice_ = eval_node(node.slice, context) 

1020 if policy.can_get_item(value, slice_): 

1021 return value[slice_] 

1022 raise GuardRejection( 

1023 "Subscript access (`__getitem__`) for", 

1024 type(value), # not joined to avoid calling `repr` 

1025 f" not allowed in {context.evaluation} mode", 

1026 ) 

1027 if isinstance(node, ast.Name): 

1028 return _eval_node_name(node.id, context) 

1029 if isinstance(node, ast.Attribute): 

1030 if ( 

1031 context.class_transients is not None 

1032 and isinstance(node.value, ast.Name) 

1033 and node.value.id == context.instance_arg_name 

1034 ): 

1035 return context.class_transients.get(node.attr) 

1036 value = eval_node(node.value, context) 

1037 if policy.can_get_attr(value, node.attr): 

1038 return getattr(value, node.attr) 

1039 try: 

1040 cls = ( 

1041 value if isinstance(value, type) else getattr(value, "__class__", None) 

1042 ) 

1043 if cls is not None: 

1044 resolved_hints = get_type_hints( 

1045 cls, 

1046 globalns=(context.globals or {}), 

1047 localns=(context.locals or {}), 

1048 ) 

1049 if node.attr in resolved_hints: 

1050 annotated = resolved_hints[node.attr] 

1051 return _resolve_annotation(annotated, context) 

1052 except Exception: 

1053 # Fall through to the guard rejection 

1054 pass 

1055 raise GuardRejection( 

1056 "Attribute access (`__getattr__`) for", 

1057 type(value), # not joined to avoid calling `repr` 

1058 f"not allowed in {context.evaluation} mode", 

1059 ) 

1060 if isinstance(node, ast.IfExp): 

1061 test = eval_node(node.test, context) 

1062 if test: 

1063 return eval_node(node.body, context) 

1064 else: 

1065 return eval_node(node.orelse, context) 

1066 if isinstance(node, ast.Call): 

1067 func = eval_node(node.func, context) 

1068 if policy.can_call(func): 

1069 args, kwargs = _extract_args_and_kwargs(node, context) 

1070 return func(*args, **kwargs) 

1071 if isclass(func): 

1072 # this code path gets entered when calling class e.g. `MyClass()` 

1073 # or `my_instance.__class__()` - in both cases `func` is `MyClass`. 

1074 # Should return `MyClass` if `__new__` is not overridden, 

1075 # otherwise whatever `__new__` return type is. 

1076 overridden_return_type = _eval_return_type(func.__new__, node, context) 

1077 if overridden_return_type is not NOT_EVALUATED: 

1078 return overridden_return_type 

1079 return _create_duck_for_heap_type(func) 

1080 else: 

1081 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED) 

1082 return_type = _eval_return_type(func, node, context) 

1083 if getattr(func, "__is_async__", False): 

1084 awaited_type = ( 

1085 inferred_return if inferred_return is not None else return_type 

1086 ) 

1087 coroutine_duck = _Duck(attributes=_get_coroutine_attributes()) 

1088 coroutine_duck.__awaited_type__ = awaited_type 

1089 return coroutine_duck 

1090 if inferred_return is not NOT_EVALUATED: 

1091 return inferred_return 

1092 if return_type is not NOT_EVALUATED: 

1093 return return_type 

1094 raise GuardRejection( 

1095 "Call for", 

1096 func, # not joined to avoid calling `repr` 

1097 f"not allowed in {context.evaluation} mode", 

1098 ) 

1099 if isinstance(node, ast.Assert): 

1100 # message is always the second item, so if it is defined user would be completing 

1101 # on the message, not on the assertion test 

1102 if node.msg: 

1103 return eval_node(node.msg, context) 

1104 return eval_node(node.test, context) 

1105 return None 

1106 

1107 

1108def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy): 

1109 """Merge multiple dictionaries, combining values for each key.""" 

1110 if len(dicts) == 1: 

1111 return dicts[0] 

1112 

1113 all_keys = set() 

1114 for d in dicts: 

1115 all_keys.update(d.keys()) 

1116 

1117 merged = {} 

1118 for key in all_keys: 

1119 values = [d[key] for d in dicts if key in d] 

1120 if values: 

1121 merged[key] = _merge_values(values, policy) 

1122 

1123 return merged 

1124 

1125 

1126def _merge_values(values, policy: EvaluationPolicy): 

1127 """Recursively merge multiple values, combining attributes and dict items.""" 

1128 if len(values) == 1: 

1129 return values[0] 

1130 

1131 types = {type(v) for v in values} 

1132 merged_items = None 

1133 key_values = {} 

1134 attributes = set() 

1135 for v in values: 

1136 if policy.can_call(v.__dir__): 

1137 attributes.update(dir(v)) 

1138 try: 

1139 if policy.can_call(v.items): 

1140 try: 

1141 for k, val in v.items(): 

1142 key_values.setdefault(k, []).append(val) 

1143 except Exception as e: 

1144 pass 

1145 elif policy.can_call(v.keys): 

1146 try: 

1147 for k in v.keys(): 

1148 key_values.setdefault(k, []).append(None) 

1149 except Exception as e: 

1150 pass 

1151 except Exception as e: 

1152 pass 

1153 

1154 if key_values: 

1155 merged_items = { 

1156 k: _merge_values(vals, policy) if vals[0] is not None else None 

1157 for k, vals in key_values.items() 

1158 } 

1159 

1160 if len(types) == 1: 

1161 t = next(iter(types)) 

1162 if t not in (dict,) and not ( 

1163 hasattr(next(iter(values)), "__getitem__") 

1164 and ( 

1165 hasattr(next(iter(values)), "items") 

1166 or hasattr(next(iter(values)), "keys") 

1167 ) 

1168 ): 

1169 if t in (list, set, tuple): 

1170 return t 

1171 return values[0] 

1172 

1173 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items) 

1174 

1175 

1176def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext): 

1177 """Infer the return value(s) of a function by evaluating all return statements.""" 

1178 return_values = _collect_return_values(node.body, context) 

1179 

1180 if not return_values: 

1181 return None 

1182 if len(return_values) == 1: 

1183 return return_values[0] 

1184 

1185 policy = get_policy(context) 

1186 return _merge_values(return_values, policy) 

1187 

1188 

1189def _collect_return_values(body, context): 

1190 """Recursively collect return values from a list of AST statements.""" 

1191 return_values = [] 

1192 for stmt in body: 

1193 if isinstance(stmt, ast.Return): 

1194 if stmt.value is None: 

1195 continue 

1196 try: 

1197 value = eval_node(stmt.value, context) 

1198 if value is not None and value is not NOT_EVALUATED: 

1199 return_values.append(value) 

1200 except Exception: 

1201 pass 

1202 if isinstance( 

1203 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda) 

1204 ): 

1205 continue 

1206 elif hasattr(stmt, "body") and isinstance(stmt.body, list): 

1207 return_values.extend(_collect_return_values(stmt.body, context)) 

1208 if isinstance(stmt, ast.Try): 

1209 for h in stmt.handlers: 

1210 if hasattr(h, "body"): 

1211 return_values.extend(_collect_return_values(h.body, context)) 

1212 if hasattr(stmt, "orelse"): 

1213 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1214 if hasattr(stmt, "finalbody"): 

1215 return_values.extend(_collect_return_values(stmt.finalbody, context)) 

1216 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list): 

1217 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1218 return return_values 

1219 

1220 

1221def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): 

1222 """Evaluate return type of a given callable function. 

1223 

1224 Returns the built-in type, a duck or NOT_EVALUATED sentinel. 

1225 """ 

1226 try: 

1227 sig = signature(func) 

1228 except ValueError: 

1229 sig = UNKNOWN_SIGNATURE 

1230 # if annotation was not stringized, or it was stringized 

1231 # but resolved by signature call we know the return type 

1232 not_empty = sig.return_annotation is not Signature.empty 

1233 if not_empty: 

1234 return _resolve_annotation(sig.return_annotation, context, sig, func, node) 

1235 return NOT_EVALUATED 

1236 

1237 

1238def _eval_annotation( 

1239 annotation: str, 

1240 context: EvaluationContext, 

1241): 

1242 return ( 

1243 _eval_node_name(annotation, context) 

1244 if isinstance(annotation, str) 

1245 else annotation 

1246 ) 

1247 

1248 

1249class _GetItemDuck(dict): 

1250 """A dict subclass that always returns the factory instance and claims to have any item.""" 

1251 

1252 def __init__(self, factory, *args, **kwargs): 

1253 super().__init__(*args, **kwargs) 

1254 self._factory = factory 

1255 

1256 def __getitem__(self, key): 

1257 return self._factory() 

1258 

1259 def __contains__(self, key): 

1260 return True 

1261 

1262 

1263def _resolve_annotation( 

1264 annotation: object | str, 

1265 context: EvaluationContext, 

1266 sig: Signature | None = None, 

1267 func: Callable | None = None, 

1268 node: ast.Call | None = None, 

1269): 

1270 """Resolve annotation created by user with `typing` module and custom objects.""" 

1271 if annotation is None: 

1272 return None 

1273 annotation = _eval_annotation(annotation, context) 

1274 origin = get_origin(annotation) 

1275 if annotation is Self and func and hasattr(func, "__self__"): 

1276 return func.__self__ 

1277 elif origin is Literal: 

1278 type_args = get_args(annotation) 

1279 if len(type_args) == 1: 

1280 return type_args[0] 

1281 elif annotation is LiteralString: 

1282 return "" 

1283 elif annotation is AnyStr: 

1284 index = None 

1285 if func and hasattr(func, "__node__"): 

1286 def_node = func.__node__ 

1287 for i, arg in enumerate(def_node.args.args): 

1288 if not arg.annotation: 

1289 continue 

1290 annotation = _eval_annotation(arg.annotation.id, context) 

1291 if annotation is AnyStr: 

1292 index = i 

1293 break 

1294 is_bound_method = ( 

1295 isinstance(func, MethodType) and getattr(func, "__self__") is not None 

1296 ) 

1297 if index and is_bound_method: 

1298 index -= 1 

1299 elif sig: 

1300 for i, (key, value) in enumerate(sig.parameters.items()): 

1301 if value.annotation is AnyStr: 

1302 index = i 

1303 break 

1304 if index is None: 

1305 return None 

1306 if index < 0 or index >= len(node.args): 

1307 return None 

1308 return eval_node(node.args[index], context) 

1309 elif origin is TypeGuard: 

1310 return False 

1311 elif origin is set or origin is list: 

1312 # only one type argument allowed 

1313 attributes = [ 

1314 attr 

1315 for attr in dir( 

1316 _resolve_annotation(get_args(annotation)[0], context, sig, func, node) 

1317 ) 

1318 ] 

1319 duck = _Duck(attributes=dict.fromkeys(attributes)) 

1320 return _Duck( 

1321 attributes=dict.fromkeys(dir(origin())), 

1322 # items are not strrictly needed for set 

1323 items=_GetItemDuck(lambda: duck), 

1324 ) 

1325 elif origin is tuple: 

1326 # multiple type arguments 

1327 return tuple( 

1328 _resolve_annotation(arg, context, sig, func, node) 

1329 for arg in get_args(annotation) 

1330 ) 

1331 elif origin is Union: 

1332 # multiple type arguments 

1333 attributes = [ 

1334 attr 

1335 for type_arg in get_args(annotation) 

1336 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node)) 

1337 ] 

1338 return _Duck(attributes=dict.fromkeys(attributes)) 

1339 elif is_typeddict(annotation): 

1340 return _Duck( 

1341 attributes=dict.fromkeys(dir(dict())), 

1342 items={ 

1343 k: _resolve_annotation(v, context, sig, func, node) 

1344 for k, v in annotation.__annotations__.items() 

1345 }, 

1346 ) 

1347 elif hasattr(annotation, "_is_protocol"): 

1348 return _Duck(attributes=dict.fromkeys(dir(annotation))) 

1349 elif origin is Annotated: 

1350 type_arg = get_args(annotation)[0] 

1351 return _resolve_annotation(type_arg, context, sig, func, node) 

1352 elif isinstance(annotation, NewType): 

1353 return _eval_or_create_duck(annotation.__supertype__, context) 

1354 elif isinstance(annotation, TypeAliasType): 

1355 return _eval_or_create_duck(annotation.__value__, context) 

1356 else: 

1357 return _eval_or_create_duck(annotation, context) 

1358 

1359 

1360def _eval_node_name(node_id: str, context: EvaluationContext): 

1361 policy = get_policy(context) 

1362 if node_id in context.transient_locals: 

1363 return context.transient_locals[node_id] 

1364 if policy.allow_locals_access and node_id in context.locals: 

1365 return context.locals[node_id] 

1366 if policy.allow_globals_access and node_id in context.globals: 

1367 return context.globals[node_id] 

1368 if policy.allow_builtins_access and hasattr(builtins, node_id): 

1369 # note: do not use __builtins__, it is implementation detail of cPython 

1370 return getattr(builtins, node_id) 

1371 if policy.allow_auto_import and context.auto_import: 

1372 return context.auto_import(node_id) 

1373 if not policy.allow_globals_access and not policy.allow_locals_access: 

1374 raise GuardRejection( 

1375 f"Namespace access not allowed in {context.evaluation} mode" 

1376 ) 

1377 else: 

1378 raise NameError(f"{node_id} not found in locals, globals, nor builtins") 

1379 

1380 

1381def _eval_or_create_duck(duck_type, context: EvaluationContext): 

1382 policy = get_policy(context) 

1383 # if allow-listed builtin is on type annotation, instantiate it 

1384 if policy.can_call(duck_type): 

1385 return duck_type() 

1386 # if custom class is in type annotation, mock it 

1387 return _create_duck_for_heap_type(duck_type) 

1388 

1389 

1390def _create_duck_for_heap_type(duck_type): 

1391 """Create an imitation of an object of a given type (a duck). 

1392 

1393 Returns the duck or NOT_EVALUATED sentinel if duck could not be created. 

1394 """ 

1395 duck = ImpersonatingDuck() 

1396 try: 

1397 # this only works for heap types, not builtins 

1398 duck.__class__ = duck_type 

1399 return duck 

1400 except TypeError: 

1401 pass 

1402 return NOT_EVALUATED 

1403 

1404 

1405SUPPORTED_EXTERNAL_GETITEM = { 

1406 ("pandas", "core", "indexing", "_iLocIndexer"), 

1407 ("pandas", "core", "indexing", "_LocIndexer"), 

1408 ("pandas", "DataFrame"), 

1409 ("pandas", "Series"), 

1410 ("numpy", "ndarray"), 

1411 ("numpy", "void"), 

1412} 

1413 

1414 

1415BUILTIN_GETITEM: set[InstancesHaveGetItem] = { 

1416 dict, 

1417 str, # type: ignore[arg-type] 

1418 bytes, # type: ignore[arg-type] 

1419 list, 

1420 tuple, 

1421 type, # for type annotations like list[str] 

1422 _Duck, 

1423 collections.defaultdict, 

1424 collections.deque, 

1425 collections.OrderedDict, 

1426 collections.ChainMap, 

1427 collections.UserDict, 

1428 collections.UserList, 

1429 collections.UserString, # type: ignore[arg-type] 

1430 _DummyNamedTuple, 

1431 _IdentitySubscript, 

1432} 

1433 

1434 

1435def _list_methods(cls, source=None): 

1436 """For use on immutable objects or with methods returning a copy""" 

1437 return [getattr(cls, k) for k in (source if source else dir(cls))] 

1438 

1439 

1440dict_non_mutating_methods = ("copy", "keys", "values", "items") 

1441list_non_mutating_methods = ("copy", "index", "count") 

1442set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) 

1443 

1444 

1445dict_keys: type[collections.abc.KeysView] = type({}.keys()) 

1446dict_values: type = type({}.values()) 

1447dict_items: type = type({}.items()) 

1448 

1449NUMERICS = {int, float, complex} 

1450 

1451ALLOWED_CALLS = { 

1452 bytes, 

1453 *_list_methods(bytes), 

1454 bytes.__iter__, 

1455 dict, 

1456 *_list_methods(dict, dict_non_mutating_methods), 

1457 dict.__iter__, 

1458 dict_keys.__iter__, 

1459 dict_values.__iter__, 

1460 dict_items.__iter__, 

1461 dict_keys.isdisjoint, 

1462 list, 

1463 *_list_methods(list, list_non_mutating_methods), 

1464 list.__iter__, 

1465 set, 

1466 *_list_methods(set, set_non_mutating_methods), 

1467 set.__iter__, 

1468 frozenset, 

1469 *_list_methods(frozenset), 

1470 frozenset.__iter__, 

1471 range, 

1472 range.__iter__, 

1473 str, 

1474 *_list_methods(str), 

1475 str.__iter__, 

1476 tuple, 

1477 *_list_methods(tuple), 

1478 tuple.__iter__, 

1479 bool, 

1480 *_list_methods(bool), 

1481 *NUMERICS, 

1482 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], 

1483 collections.deque, 

1484 *_list_methods(collections.deque, list_non_mutating_methods), 

1485 collections.deque.__iter__, 

1486 collections.defaultdict, 

1487 *_list_methods(collections.defaultdict, dict_non_mutating_methods), 

1488 collections.defaultdict.__iter__, 

1489 collections.OrderedDict, 

1490 *_list_methods(collections.OrderedDict, dict_non_mutating_methods), 

1491 collections.OrderedDict.__iter__, 

1492 collections.UserDict, 

1493 *_list_methods(collections.UserDict, dict_non_mutating_methods), 

1494 collections.UserDict.__iter__, 

1495 collections.UserList, 

1496 *_list_methods(collections.UserList, list_non_mutating_methods), 

1497 collections.UserList.__iter__, 

1498 collections.UserString, 

1499 *_list_methods(collections.UserString, dir(str)), 

1500 collections.UserString.__iter__, 

1501 collections.Counter, 

1502 *_list_methods(collections.Counter, dict_non_mutating_methods), 

1503 collections.Counter.__iter__, 

1504 collections.Counter.elements, 

1505 collections.Counter.most_common, 

1506 object.__dir__, 

1507 type.__dir__, 

1508 _Duck.__dir__, 

1509} 

1510 

1511BUILTIN_GETATTR: set[MayHaveGetattr] = { 

1512 *BUILTIN_GETITEM, 

1513 set, 

1514 frozenset, 

1515 object, 

1516 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. 

1517 *NUMERICS, 

1518 dict_keys, 

1519 MethodDescriptorType, 

1520 ModuleType, 

1521} 

1522 

1523 

1524BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} 

1525 

1526EVALUATION_POLICIES = { 

1527 "minimal": EvaluationPolicy( 

1528 allow_builtins_access=True, 

1529 allow_locals_access=False, 

1530 allow_globals_access=False, 

1531 allow_item_access=False, 

1532 allow_attr_access=False, 

1533 allowed_calls=set(), 

1534 allow_any_calls=False, 

1535 allow_all_operations=False, 

1536 ), 

1537 "limited": SelectivePolicy( 

1538 allowed_getitem=BUILTIN_GETITEM, 

1539 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, 

1540 allowed_getattr=BUILTIN_GETATTR, 

1541 allowed_getattr_external={ 

1542 # pandas Series/Frame implements custom `__getattr__` 

1543 ("pandas", "DataFrame"), 

1544 ("pandas", "Series"), 

1545 }, 

1546 allowed_operations=BUILTIN_OPERATIONS, 

1547 allow_builtins_access=True, 

1548 allow_locals_access=True, 

1549 allow_globals_access=True, 

1550 allow_getitem_on_types=True, 

1551 allowed_calls=ALLOWED_CALLS, 

1552 ), 

1553 "unsafe": EvaluationPolicy( 

1554 allow_builtins_access=True, 

1555 allow_locals_access=True, 

1556 allow_globals_access=True, 

1557 allow_attr_access=True, 

1558 allow_item_access=True, 

1559 allow_any_calls=True, 

1560 allow_all_operations=True, 

1561 ), 

1562} 

1563 

1564 

1565__all__ = [ 

1566 "guarded_eval", 

1567 "eval_node", 

1568 "GuardRejection", 

1569 "EvaluationContext", 

1570 "_unbind_method", 

1571]