Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/guarded_eval.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

812 statements  

1from copy import copy 

2from inspect import isclass, signature, Signature, getmodule 

3from typing import ( 

4 Annotated, 

5 AnyStr, 

6 Callable, 

7 Literal, 

8 NamedTuple, 

9 NewType, 

10 Optional, 

11 Protocol, 

12 Sequence, 

13 TypeGuard, 

14 Union, 

15 get_args, 

16 get_origin, 

17 is_typeddict, 

18) 

19import ast 

20import builtins 

21import collections 

22import dataclasses 

23import operator 

24import sys 

25import typing 

26import warnings 

27from functools import cached_property 

28from dataclasses import dataclass, field 

29from types import MethodDescriptorType, ModuleType, MethodType 

30 

31from IPython.utils.decorators import undoc 

32 

33 

34from typing import Self, LiteralString 

35 

36if sys.version_info < (3, 12): 

37 from typing_extensions import TypeAliasType 

38else: 

39 from typing import TypeAliasType 

40 

41 

42@undoc 

43class HasGetItem(Protocol): 

44 def __getitem__(self, key) -> None: 

45 ... 

46 

47 

48@undoc 

49class InstancesHaveGetItem(Protocol): 

50 def __call__(self, *args, **kwargs) -> HasGetItem: 

51 ... 

52 

53 

54@undoc 

55class HasGetAttr(Protocol): 

56 def __getattr__(self, key) -> None: 

57 ... 

58 

59 

60@undoc 

61class DoesNotHaveGetAttr(Protocol): 

62 pass 

63 

64 

65# By default `__getattr__` is not explicitly implemented on most objects 

66MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] 

67 

68 

69def _unbind_method(func: Callable) -> Union[Callable, None]: 

70 """Get unbound method for given bound method. 

71 

72 Returns None if cannot get unbound method, or method is already unbound. 

73 """ 

74 owner = getattr(func, "__self__", None) 

75 owner_class = type(owner) 

76 name = getattr(func, "__name__", None) 

77 instance_dict_overrides = getattr(owner, "__dict__", None) 

78 if ( 

79 owner is not None 

80 and name 

81 and ( 

82 not instance_dict_overrides 

83 or (instance_dict_overrides and name not in instance_dict_overrides) 

84 ) 

85 ): 

86 return getattr(owner_class, name) 

87 return None 

88 

89 

90@undoc 

91@dataclass 

92class EvaluationPolicy: 

93 """Definition of evaluation policy.""" 

94 

95 allow_locals_access: bool = False 

96 allow_globals_access: bool = False 

97 allow_item_access: bool = False 

98 allow_attr_access: bool = False 

99 allow_builtins_access: bool = False 

100 allow_all_operations: bool = False 

101 allow_any_calls: bool = False 

102 allow_auto_import: bool = False 

103 allowed_calls: set[Callable] = field(default_factory=set) 

104 

105 def can_get_item(self, value, item): 

106 return self.allow_item_access 

107 

108 def can_get_attr(self, value, attr): 

109 return self.allow_attr_access 

110 

111 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

112 if self.allow_all_operations: 

113 return True 

114 

115 def can_call(self, func): 

116 if self.allow_any_calls: 

117 return True 

118 

119 if func in self.allowed_calls: 

120 return True 

121 

122 owner_method = _unbind_method(func) 

123 

124 if owner_method and owner_method in self.allowed_calls: 

125 return True 

126 

127 

128def _get_external(module_name: str, access_path: Sequence[str]): 

129 """Get value from external module given a dotted access path. 

130 

131 Only gets value if the module is already imported. 

132 

133 Raises: 

134 * `KeyError` if module is removed not found, and 

135 * `AttributeError` if access path does not match an exported object 

136 """ 

137 try: 

138 member_type = sys.modules[module_name] 

139 # standard module 

140 for attr in access_path: 

141 member_type = getattr(member_type, attr) 

142 return member_type 

143 except (KeyError, AttributeError): 

144 # handle modules in namespace packages 

145 module_path = ".".join([module_name, *access_path]) 

146 if module_path in sys.modules: 

147 return sys.modules[module_path] 

148 raise 

149 

150 

151def _has_original_dunder_external( 

152 value, 

153 module_name: str, 

154 access_path: Sequence[str], 

155 method_name: str, 

156): 

157 if module_name not in sys.modules: 

158 full_module_path = ".".join([module_name, *access_path]) 

159 if full_module_path not in sys.modules: 

160 # LBYLB as it is faster 

161 return False 

162 try: 

163 member_type = _get_external(module_name, access_path) 

164 value_type = type(value) 

165 if type(value) == member_type: 

166 return True 

167 if isinstance(member_type, ModuleType): 

168 value_module = getmodule(value_type) 

169 if not value_module or not value_module.__name__: 

170 return False 

171 if ( 

172 value_module.__name__ == member_type.__name__ 

173 or value_module.__name__.startswith(member_type.__name__ + ".") 

174 ): 

175 return True 

176 if method_name == "__getattribute__": 

177 # we have to short-circuit here due to an unresolved issue in 

178 # `isinstance` implementation: https://bugs.python.org/issue32683 

179 return False 

180 if not isinstance(member_type, ModuleType) and isinstance(value, member_type): 

181 method = getattr(value_type, method_name, None) 

182 member_method = getattr(member_type, method_name, None) 

183 if member_method == method: 

184 return True 

185 if isinstance(member_type, ModuleType): 

186 method = getattr(value_type, method_name, None) 

187 for base_class in value_type.__mro__[1:]: 

188 base_module = getmodule(base_class) 

189 if base_module and ( 

190 base_module.__name__ == member_type.__name__ 

191 or base_module.__name__.startswith(member_type.__name__ + ".") 

192 ): 

193 # Check if the method comes from this trusted base class 

194 base_method = getattr(base_class, method_name, None) 

195 if base_method is not None and base_method == method: 

196 return True 

197 except (AttributeError, KeyError): 

198 return False 

199 

200 

201def _has_original_dunder( 

202 value, allowed_types, allowed_methods, allowed_external, method_name 

203): 

204 # note: Python ignores `__getattr__`/`__getitem__` on instances, 

205 # we only need to check at class level 

206 value_type = type(value) 

207 

208 # strict type check passes → no need to check method 

209 if value_type in allowed_types: 

210 return True 

211 

212 method = getattr(value_type, method_name, None) 

213 

214 if method is None: 

215 return None 

216 

217 if method in allowed_methods: 

218 return True 

219 

220 for module_name, *access_path in allowed_external: 

221 if _has_original_dunder_external(value, module_name, access_path, method_name): 

222 return True 

223 

224 return False 

225 

226 

227def _coerce_path_to_tuples( 

228 allow_list: set[tuple[str, ...] | str], 

229) -> set[tuple[str, ...]]: 

230 """Replace dotted paths on the provided allow-list with tuples.""" 

231 return { 

232 path if isinstance(path, tuple) else tuple(path.split(".")) 

233 for path in allow_list 

234 } 

235 

236 

237@undoc 

238@dataclass 

239class SelectivePolicy(EvaluationPolicy): 

240 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set) 

241 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set) 

242 

243 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set) 

244 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set) 

245 

246 allowed_operations: set = field(default_factory=set) 

247 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set) 

248 

249 allow_getitem_on_types: bool = field(default_factory=bool) 

250 

251 _operation_methods_cache: dict[str, set[Callable]] = field( 

252 default_factory=dict, init=False 

253 ) 

254 

255 def can_get_attr(self, value, attr): 

256 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external) 

257 

258 has_original_attribute = _has_original_dunder( 

259 value, 

260 allowed_types=self.allowed_getattr, 

261 allowed_methods=self._getattribute_methods, 

262 allowed_external=allowed_getattr_external, 

263 method_name="__getattribute__", 

264 ) 

265 has_original_attr = _has_original_dunder( 

266 value, 

267 allowed_types=self.allowed_getattr, 

268 allowed_methods=self._getattr_methods, 

269 allowed_external=allowed_getattr_external, 

270 method_name="__getattr__", 

271 ) 

272 

273 accept = False 

274 

275 # Many objects do not have `__getattr__`, this is fine. 

276 if has_original_attr is None and has_original_attribute: 

277 accept = True 

278 else: 

279 # Accept objects without modifications to `__getattr__` and `__getattribute__` 

280 accept = has_original_attr and has_original_attribute 

281 

282 if accept: 

283 # We still need to check for overridden properties. 

284 

285 value_class = type(value) 

286 if not hasattr(value_class, attr): 

287 return True 

288 

289 class_attr_val = getattr(value_class, attr) 

290 is_property = isinstance(class_attr_val, property) 

291 

292 if not is_property: 

293 return True 

294 

295 # Properties in allowed types are ok (although we do not include any 

296 # properties in our default allow list currently). 

297 if type(value) in self.allowed_getattr: 

298 return True # pragma: no cover 

299 

300 # Properties in subclasses of allowed types may be ok if not changed 

301 for module_name, *access_path in allowed_getattr_external: 

302 try: 

303 external_class = _get_external(module_name, access_path) 

304 external_class_attr_val = getattr(external_class, attr) 

305 except (KeyError, AttributeError): 

306 return False # pragma: no cover 

307 return class_attr_val == external_class_attr_val 

308 

309 return False 

310 

311 def can_get_item(self, value, item): 

312 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" 

313 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external) 

314 if self.allow_getitem_on_types: 

315 # e.g. Union[str, int] or Literal[True, 1] 

316 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)): 

317 return True 

318 # PEP 560 e.g. list[str] 

319 if isinstance(value, type) and hasattr(value, "__class_getitem__"): 

320 return True 

321 return _has_original_dunder( 

322 value, 

323 allowed_types=self.allowed_getitem, 

324 allowed_methods=self._getitem_methods, 

325 allowed_external=allowed_getitem_external, 

326 method_name="__getitem__", 

327 ) 

328 

329 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

330 allowed_operations_external = _coerce_path_to_tuples( 

331 self.allowed_operations_external 

332 ) 

333 objects = [a] 

334 if b is not None: 

335 objects.append(b) 

336 return all( 

337 [ 

338 _has_original_dunder( 

339 obj, 

340 allowed_types=self.allowed_operations, 

341 allowed_methods=self._operator_dunder_methods(dunder), 

342 allowed_external=allowed_operations_external, 

343 method_name=dunder, 

344 ) 

345 for dunder in dunders 

346 for obj in objects 

347 ] 

348 ) 

349 

350 def _operator_dunder_methods(self, dunder: str) -> set[Callable]: 

351 if dunder not in self._operation_methods_cache: 

352 self._operation_methods_cache[dunder] = self._safe_get_methods( 

353 self.allowed_operations, dunder 

354 ) 

355 return self._operation_methods_cache[dunder] 

356 

357 @cached_property 

358 def _getitem_methods(self) -> set[Callable]: 

359 return self._safe_get_methods(self.allowed_getitem, "__getitem__") 

360 

361 @cached_property 

362 def _getattr_methods(self) -> set[Callable]: 

363 return self._safe_get_methods(self.allowed_getattr, "__getattr__") 

364 

365 @cached_property 

366 def _getattribute_methods(self) -> set[Callable]: 

367 return self._safe_get_methods(self.allowed_getattr, "__getattribute__") 

368 

369 def _safe_get_methods(self, classes, name) -> set[Callable]: 

370 return { 

371 method 

372 for class_ in classes 

373 for method in [getattr(class_, name, None)] 

374 if method 

375 } 

376 

377 

378class _DummyNamedTuple(NamedTuple): 

379 """Used internally to retrieve methods of named tuple instance.""" 

380 

381 

382EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] 

383 

384 

385@dataclass 

386class EvaluationContext: 

387 #: Local namespace 

388 locals: dict 

389 #: Global namespace 

390 globals: dict 

391 #: Evaluation policy identifier 

392 evaluation: EvaluationPolicyName = "forbidden" 

393 #: Whether the evaluation of code takes place inside of a subscript. 

394 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. 

395 in_subscript: bool = False 

396 #: Auto import method 

397 auto_import: Callable[list[str], ModuleType] | None = None 

398 #: Overrides for evaluation policy 

399 policy_overrides: dict = field(default_factory=dict) 

400 #: Transient local namespace used to store mocks 

401 transient_locals: dict = field(default_factory=dict) 

402 #: Transients of class level 

403 class_transients: dict | None = None 

404 #: Instance variable name used in the method definition 

405 instance_arg_name: str | None = None 

406 

407 def replace(self, /, **changes): 

408 """Return a new copy of the context, with specified changes""" 

409 return dataclasses.replace(self, **changes) 

410 

411 

412class _IdentitySubscript: 

413 """Returns the key itself when item is requested via subscript.""" 

414 

415 def __getitem__(self, key): 

416 return key 

417 

418 

419IDENTITY_SUBSCRIPT = _IdentitySubscript() 

420SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" 

421UNKNOWN_SIGNATURE = Signature() 

422NOT_EVALUATED = object() 

423 

424 

425class GuardRejection(Exception): 

426 """Exception raised when guard rejects evaluation attempt.""" 

427 

428 pass 

429 

430 

431def guarded_eval(code: str, context: EvaluationContext): 

432 """Evaluate provided code in the evaluation context. 

433 

434 If evaluation policy given by context is set to ``forbidden`` 

435 no evaluation will be performed; if it is set to ``dangerous`` 

436 standard :func:`eval` will be used; finally, for any other, 

437 policy :func:`eval_node` will be called on parsed AST. 

438 """ 

439 locals_ = context.locals 

440 

441 if context.evaluation == "forbidden": 

442 raise GuardRejection("Forbidden mode") 

443 

444 # note: not using `ast.literal_eval` as it does not implement 

445 # getitem at all, for example it fails on simple `[0][1]` 

446 

447 if context.in_subscript: 

448 # syntactic sugar for ellipsis (:) is only available in subscripts 

449 # so we need to trick the ast parser into thinking that we have 

450 # a subscript, but we need to be able to later recognise that we did 

451 # it so we can ignore the actual __getitem__ operation 

452 if not code: 

453 return tuple() 

454 locals_ = locals_.copy() 

455 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT 

456 code = SUBSCRIPT_MARKER + "[" + code + "]" 

457 context = context.replace(locals=locals_) 

458 

459 if context.evaluation == "dangerous": 

460 return eval(code, context.globals, context.locals) 

461 

462 node = ast.parse(code, mode="exec") 

463 

464 return eval_node(node, context) 

465 

466 

467BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = { 

468 ast.Add: ("__add__",), 

469 ast.Sub: ("__sub__",), 

470 ast.Mult: ("__mul__",), 

471 ast.Div: ("__truediv__",), 

472 ast.FloorDiv: ("__floordiv__",), 

473 ast.Mod: ("__mod__",), 

474 ast.Pow: ("__pow__",), 

475 ast.LShift: ("__lshift__",), 

476 ast.RShift: ("__rshift__",), 

477 ast.BitOr: ("__or__",), 

478 ast.BitXor: ("__xor__",), 

479 ast.BitAnd: ("__and__",), 

480 ast.MatMult: ("__matmul__",), 

481} 

482 

483COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = { 

484 ast.Eq: ("__eq__",), 

485 ast.NotEq: ("__ne__", "__eq__"), 

486 ast.Lt: ("__lt__", "__gt__"), 

487 ast.LtE: ("__le__", "__ge__"), 

488 ast.Gt: ("__gt__", "__lt__"), 

489 ast.GtE: ("__ge__", "__le__"), 

490 ast.In: ("__contains__",), 

491 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially 

492} 

493 

494UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = { 

495 ast.USub: ("__neg__",), 

496 ast.UAdd: ("__pos__",), 

497 # we have to check both __inv__ and __invert__! 

498 ast.Invert: ("__invert__", "__inv__"), 

499 ast.Not: ("__not__",), 

500} 

501 

502 

503class ImpersonatingDuck: 

504 """A dummy class used to create objects of other classes without calling their ``__init__``""" 

505 

506 # no-op: override __class__ to impersonate 

507 

508 

509class _Duck: 

510 """A dummy class used to create objects pretending to have given attributes""" 

511 

512 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): 

513 self.attributes = attributes if attributes is not None else {} 

514 self.items = items if items is not None else {} 

515 

516 def __getattr__(self, attr: str): 

517 return self.attributes[attr] 

518 

519 def __hasattr__(self, attr: str): 

520 return attr in self.attributes 

521 

522 def __dir__(self): 

523 return [*dir(super), *self.attributes] 

524 

525 def __getitem__(self, key: str): 

526 return self.items[key] 

527 

528 def __hasitem__(self, key: str): 

529 return self.items[key] 

530 

531 def _ipython_key_completions_(self): 

532 return self.items.keys() 

533 

534 

535def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]: 

536 dunder = None 

537 for op, candidate_dunder in dunders.items(): 

538 if isinstance(node_op, op): 

539 dunder = candidate_dunder 

540 return dunder 

541 

542 

543def get_policy(context: EvaluationContext) -> EvaluationPolicy: 

544 policy = copy(EVALUATION_POLICIES[context.evaluation]) 

545 

546 for key, value in context.policy_overrides.items(): 

547 if hasattr(policy, key): 

548 setattr(policy, key, value) 

549 return policy 

550 

551 

552def _validate_policy_overrides( 

553 policy_name: EvaluationPolicyName, policy_overrides: dict 

554) -> bool: 

555 policy = EVALUATION_POLICIES[policy_name] 

556 

557 all_good = True 

558 for key, value in policy_overrides.items(): 

559 if not hasattr(policy, key): 

560 warnings.warn( 

561 f"Override {key!r} is not valid with {policy_name!r} evaluation policy" 

562 ) 

563 all_good = False 

564 return all_good 

565 

566 

567def _handle_assign(node: ast.Assign, context: EvaluationContext): 

568 value = eval_node(node.value, context) 

569 transient_locals = context.transient_locals 

570 policy = get_policy(context) 

571 class_transients = context.class_transients 

572 for target in node.targets: 

573 if isinstance(target, (ast.Tuple, ast.List)): 

574 # Handle unpacking assignment 

575 values = list(value) 

576 targets = target.elts 

577 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)] 

578 

579 # Unified handling: treat no starred as starred at end 

580 star_or_last_idx = starred[0] if starred else len(targets) 

581 

582 # Before starred 

583 for i in range(star_or_last_idx): 

584 # Check for self.x assignment 

585 if _is_instance_attribute_assignment(targets[i], context): 

586 class_transients[targets[i].attr] = values[i] 

587 else: 

588 transient_locals[targets[i].id] = values[i] 

589 

590 # Starred if exists 

591 if starred: 

592 end = len(values) - (len(targets) - star_or_last_idx - 1) 

593 if _is_instance_attribute_assignment( 

594 targets[star_or_last_idx], context 

595 ): 

596 class_transients[targets[star_or_last_idx].attr] = values[ 

597 star_or_last_idx:end 

598 ] 

599 else: 

600 transient_locals[targets[star_or_last_idx].value.id] = values[ 

601 star_or_last_idx:end 

602 ] 

603 

604 # After starred 

605 for i in range(star_or_last_idx + 1, len(targets)): 

606 if _is_instance_attribute_assignment(targets[i], context): 

607 class_transients[targets[i].attr] = values[ 

608 len(values) - (len(targets) - i) 

609 ] 

610 else: 

611 transient_locals[targets[i].id] = values[ 

612 len(values) - (len(targets) - i) 

613 ] 

614 elif isinstance(target, ast.Subscript): 

615 if isinstance(target.value, ast.Name): 

616 name = target.value.id 

617 container = transient_locals.get(name) 

618 if container is None: 

619 container = context.locals.get(name) 

620 if container is None: 

621 container = context.globals.get(name) 

622 if container is None: 

623 raise NameError( 

624 f"{name} not found in locals, globals, nor builtins" 

625 ) 

626 storage_dict = transient_locals 

627 storage_key = name 

628 elif isinstance( 

629 target.value, ast.Attribute 

630 ) and _is_instance_attribute_assignment(target.value, context): 

631 attr = target.value.attr 

632 container = class_transients.get(attr, None) 

633 if container is None: 

634 raise NameError(f"{attr} not found in class transients") 

635 storage_dict = class_transients 

636 storage_key = attr 

637 else: 

638 return 

639 

640 key = eval_node(target.slice, context) 

641 attributes = ( 

642 dict.fromkeys(dir(container)) 

643 if policy.can_call(container.__dir__) 

644 else {} 

645 ) 

646 items = {} 

647 

648 if policy.can_get_item(container, None): 

649 try: 

650 items = dict(container.items()) 

651 except Exception: 

652 pass 

653 

654 items[key] = value 

655 duck_container = _Duck(attributes=attributes, items=items) 

656 storage_dict[storage_key] = duck_container 

657 elif _is_instance_attribute_assignment(target, context): 

658 class_transients[target.attr] = value 

659 else: 

660 transient_locals[target.id] = value 

661 return None 

662 

663 

664def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext): 

665 args = [eval_node(arg, context) for arg in node.args] 

666 kwargs = { 

667 k: v 

668 for kw in node.keywords 

669 for k, v in ( 

670 {kw.arg: eval_node(kw.value, context)} 

671 if kw.arg 

672 else eval_node(kw.value, context) 

673 ).items() 

674 } 

675 return args, kwargs 

676 

677 

678def _is_instance_attribute_assignment( 

679 target: ast.AST, context: EvaluationContext 

680) -> bool: 

681 """Return True if target is an attribute access on the instance argument.""" 

682 return ( 

683 context.class_transients is not None 

684 and context.instance_arg_name is not None 

685 and isinstance(target, ast.Attribute) 

686 and isinstance(getattr(target, "value", None), ast.Name) 

687 and getattr(target.value, "id", None) == context.instance_arg_name 

688 ) 

689 

690 

691def _get_coroutine_attributes() -> dict[str, Optional[object]]: 

692 async def _dummy(): 

693 return None 

694 

695 coro = _dummy() 

696 try: 

697 return {attr: getattr(coro, attr, None) for attr in dir(coro)} 

698 finally: 

699 coro.close() 

700 

701 

702def eval_node(node: Union[ast.AST, None], context: EvaluationContext): 

703 """Evaluate AST node in provided context. 

704 

705 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. 

706 

707 Does not evaluate actions that always have side effects: 

708 

709 - class definitions (``class sth: ...``) 

710 - function definitions (``def sth: ...``) 

711 - variable assignments (``x = 1``) 

712 - augmented assignments (``x += 1``) 

713 - deletions (``del x``) 

714 

715 Does not evaluate operations which do not return values: 

716 

717 - assertions (``assert x``) 

718 - pass (``pass``) 

719 - imports (``import x``) 

720 - control flow: 

721 

722 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``) 

723 - loops (``for`` and ``while``) 

724 - exception handling 

725 

726 The purpose of this function is to guard against unwanted side-effects; 

727 it does not give guarantees on protection from malicious code execution. 

728 """ 

729 policy = get_policy(context) 

730 

731 if node is None: 

732 return None 

733 if isinstance(node, (ast.Interactive, ast.Module)): 

734 result = None 

735 for child_node in node.body: 

736 result = eval_node(child_node, context) 

737 return result 

738 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

739 is_async = isinstance(node, ast.AsyncFunctionDef) 

740 func_locals = context.transient_locals.copy() 

741 func_context = context.replace(transient_locals=func_locals) 

742 is_property = False 

743 is_static = False 

744 is_classmethod = False 

745 for decorator_node in node.decorator_list: 

746 try: 

747 decorator = eval_node(decorator_node, context) 

748 except NameError: 

749 # if the decorator is not yet defined this is fine 

750 # especialy because we don't handle imports yet 

751 continue 

752 if decorator is property: 

753 is_property = True 

754 elif decorator is staticmethod: 

755 is_static = True 

756 elif decorator is classmethod: 

757 is_classmethod = True 

758 

759 if func_context.class_transients is not None: 

760 if not is_static and not is_classmethod: 

761 func_context.instance_arg_name = ( 

762 node.args.args[0].arg if node.args.args else None 

763 ) 

764 

765 return_type = eval_node(node.returns, context=context) 

766 

767 for child_node in node.body: 

768 eval_node(child_node, func_context) 

769 

770 if is_property: 

771 if return_type is not None: 

772 context.transient_locals[node.name] = _resolve_annotation( 

773 return_type, context 

774 ) 

775 else: 

776 return_value = _infer_return_value(node, func_context) 

777 context.transient_locals[node.name] = return_value 

778 

779 return None 

780 

781 def dummy_function(*args, **kwargs): 

782 pass 

783 

784 if return_type is not None: 

785 dummy_function.__annotations__["return"] = return_type 

786 else: 

787 inferred_return = _infer_return_value(node, func_context) 

788 if inferred_return is not None: 

789 dummy_function.__inferred_return__ = inferred_return 

790 

791 dummy_function.__name__ = node.name 

792 dummy_function.__node__ = node 

793 dummy_function.__is_async__ = is_async 

794 context.transient_locals[node.name] = dummy_function 

795 return None 

796 if isinstance(node, ast.Lambda): 

797 

798 def dummy_function(*args, **kwargs): 

799 pass 

800 

801 dummy_function.__inferred_return__ = eval_node(node.body, context) 

802 return dummy_function 

803 if isinstance(node, ast.ClassDef): 

804 # TODO support class decorators? 

805 class_locals = {} 

806 outer_locals = context.locals.copy() 

807 outer_locals.update(context.transient_locals) 

808 class_context = context.replace( 

809 transient_locals=class_locals, locals=outer_locals 

810 ) 

811 class_context.class_transients = class_locals 

812 for child_node in node.body: 

813 eval_node(child_node, class_context) 

814 bases = tuple([eval_node(base, context) for base in node.bases]) 

815 dummy_class = type(node.name, bases, class_locals) 

816 context.transient_locals[node.name] = dummy_class 

817 return None 

818 if isinstance(node, ast.Await): 

819 value = eval_node(node.value, context) 

820 if hasattr(value, "__awaited_type__"): 

821 return value.__awaited_type__ 

822 return value 

823 if isinstance(node, ast.While): 

824 loop_locals = context.transient_locals.copy() 

825 loop_context = context.replace(transient_locals=loop_locals) 

826 

827 result = None 

828 for stmt in node.body: 

829 result = eval_node(stmt, loop_context) 

830 

831 policy = get_policy(context) 

832 merged_locals = _merge_dicts_by_key( 

833 [loop_locals, context.transient_locals.copy()], policy 

834 ) 

835 context.transient_locals.update(merged_locals) 

836 

837 return result 

838 if isinstance(node, ast.For): 

839 try: 

840 iterable = eval_node(node.iter, context) 

841 except Exception: 

842 iterable = None 

843 

844 sample = None 

845 if iterable is not None: 

846 try: 

847 if policy.can_call(getattr(iterable, "__iter__", None)): 

848 sample = next(iter(iterable)) 

849 except Exception: 

850 sample = None 

851 

852 loop_locals = context.transient_locals.copy() 

853 loop_context = context.replace(transient_locals=loop_locals) 

854 

855 if sample is not None: 

856 try: 

857 fake_assign = ast.Assign( 

858 targets=[node.target], value=ast.Constant(value=sample) 

859 ) 

860 _handle_assign(fake_assign, loop_context) 

861 except Exception: 

862 pass 

863 

864 result = None 

865 for stmt in node.body: 

866 result = eval_node(stmt, loop_context) 

867 

868 policy = get_policy(context) 

869 merged_locals = _merge_dicts_by_key( 

870 [loop_locals, context.transient_locals.copy()], policy 

871 ) 

872 context.transient_locals.update(merged_locals) 

873 

874 return result 

875 if isinstance(node, ast.If): 

876 branches = [] 

877 current = node 

878 result = None 

879 while True: 

880 branch_locals = context.transient_locals.copy() 

881 branch_context = context.replace(transient_locals=branch_locals) 

882 for stmt in current.body: 

883 result = eval_node(stmt, branch_context) 

884 branches.append(branch_locals) 

885 if not current.orelse: 

886 break 

887 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If): 

888 # It's an elif - continue loop 

889 current = current.orelse[0] 

890 else: 

891 # It's an else block - process and break 

892 else_locals = context.transient_locals.copy() 

893 else_context = context.replace(transient_locals=else_locals) 

894 for stmt in current.orelse: 

895 result = eval_node(stmt, else_context) 

896 branches.append(else_locals) 

897 break 

898 branches.append(context.transient_locals.copy()) 

899 policy = get_policy(context) 

900 merged_locals = _merge_dicts_by_key(branches, policy) 

901 context.transient_locals.update(merged_locals) 

902 return result 

903 if isinstance(node, ast.Assign): 

904 return _handle_assign(node, context) 

905 if isinstance(node, ast.AnnAssign): 

906 if node.simple: 

907 value = _resolve_annotation(eval_node(node.annotation, context), context) 

908 context.transient_locals[node.target.id] = value 

909 # Handle non-simple annotated assignments only for self.x: type = value 

910 if _is_instance_attribute_assignment(node.target, context): 

911 value = _resolve_annotation(eval_node(node.annotation, context), context) 

912 context.class_transients[node.target.attr] = value 

913 return None 

914 if isinstance(node, ast.Expression): 

915 return eval_node(node.body, context) 

916 if isinstance(node, ast.Expr): 

917 return eval_node(node.value, context) 

918 if isinstance(node, ast.Pass): 

919 return None 

920 if isinstance(node, ast.Import): 

921 # TODO: populate transient_locals 

922 return None 

923 if isinstance(node, (ast.AugAssign, ast.Delete)): 

924 return None 

925 if isinstance(node, (ast.Global, ast.Nonlocal)): 

926 return None 

927 if isinstance(node, ast.BinOp): 

928 left = eval_node(node.left, context) 

929 right = eval_node(node.right, context) 

930 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) 

931 if dunders: 

932 if policy.can_operate(dunders, left, right): 

933 return getattr(left, dunders[0])(right) 

934 else: 

935 raise GuardRejection( 

936 f"Operation (`{dunders}`) for", 

937 type(left), 

938 f"not allowed in {context.evaluation} mode", 

939 ) 

940 if isinstance(node, ast.Compare): 

941 left = eval_node(node.left, context) 

942 all_true = True 

943 negate = False 

944 for op, right in zip(node.ops, node.comparators): 

945 right = eval_node(right, context) 

946 dunder = None 

947 dunders = _find_dunder(op, COMP_OP_DUNDERS) 

948 if not dunders: 

949 if isinstance(op, ast.NotIn): 

950 dunders = COMP_OP_DUNDERS[ast.In] 

951 negate = True 

952 if isinstance(op, ast.Is): 

953 dunder = "is_" 

954 if isinstance(op, ast.IsNot): 

955 dunder = "is_" 

956 negate = True 

957 if not dunder and dunders: 

958 dunder = dunders[0] 

959 if dunder: 

960 a, b = (right, left) if dunder == "__contains__" else (left, right) 

961 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): 

962 result = getattr(operator, dunder)(a, b) 

963 if negate: 

964 result = not result 

965 if not result: 

966 all_true = False 

967 left = right 

968 else: 

969 raise GuardRejection( 

970 f"Comparison (`{dunder}`) for", 

971 type(left), 

972 f"not allowed in {context.evaluation} mode", 

973 ) 

974 else: 

975 raise ValueError( 

976 f"Comparison `{dunder}` not supported" 

977 ) # pragma: no cover 

978 return all_true 

979 if isinstance(node, ast.Constant): 

980 return node.value 

981 if isinstance(node, ast.Tuple): 

982 return tuple(eval_node(e, context) for e in node.elts) 

983 if isinstance(node, ast.List): 

984 return [eval_node(e, context) for e in node.elts] 

985 if isinstance(node, ast.Set): 

986 return {eval_node(e, context) for e in node.elts} 

987 if isinstance(node, ast.Dict): 

988 return dict( 

989 zip( 

990 [eval_node(k, context) for k in node.keys], 

991 [eval_node(v, context) for v in node.values], 

992 ) 

993 ) 

994 if isinstance(node, ast.Slice): 

995 return slice( 

996 eval_node(node.lower, context), 

997 eval_node(node.upper, context), 

998 eval_node(node.step, context), 

999 ) 

1000 if isinstance(node, ast.UnaryOp): 

1001 value = eval_node(node.operand, context) 

1002 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) 

1003 if dunders: 

1004 if policy.can_operate(dunders, value): 

1005 try: 

1006 return getattr(value, dunders[0])() 

1007 except AttributeError: 

1008 raise TypeError( 

1009 f"bad operand type for unary {node.op}: {type(value)}" 

1010 ) 

1011 else: 

1012 raise GuardRejection( 

1013 f"Operation (`{dunders}`) for", 

1014 type(value), 

1015 f"not allowed in {context.evaluation} mode", 

1016 ) 

1017 if isinstance(node, ast.Subscript): 

1018 value = eval_node(node.value, context) 

1019 slice_ = eval_node(node.slice, context) 

1020 if policy.can_get_item(value, slice_): 

1021 return value[slice_] 

1022 raise GuardRejection( 

1023 "Subscript access (`__getitem__`) for", 

1024 type(value), # not joined to avoid calling `repr` 

1025 f" not allowed in {context.evaluation} mode", 

1026 ) 

1027 if isinstance(node, ast.Name): 

1028 return _eval_node_name(node.id, context) 

1029 if isinstance(node, ast.Attribute): 

1030 if ( 

1031 context.class_transients is not None 

1032 and isinstance(node.value, ast.Name) 

1033 and node.value.id == context.instance_arg_name 

1034 ): 

1035 return context.class_transients.get(node.attr) 

1036 value = eval_node(node.value, context) 

1037 if policy.can_get_attr(value, node.attr): 

1038 return getattr(value, node.attr) 

1039 raise GuardRejection( 

1040 "Attribute access (`__getattr__`) for", 

1041 type(value), # not joined to avoid calling `repr` 

1042 f"not allowed in {context.evaluation} mode", 

1043 ) 

1044 if isinstance(node, ast.IfExp): 

1045 test = eval_node(node.test, context) 

1046 if test: 

1047 return eval_node(node.body, context) 

1048 else: 

1049 return eval_node(node.orelse, context) 

1050 if isinstance(node, ast.Call): 

1051 func = eval_node(node.func, context) 

1052 if policy.can_call(func): 

1053 args, kwargs = _extract_args_and_kwargs(node, context) 

1054 return func(*args, **kwargs) 

1055 if isclass(func): 

1056 # this code path gets entered when calling class e.g. `MyClass()` 

1057 # or `my_instance.__class__()` - in both cases `func` is `MyClass`. 

1058 # Should return `MyClass` if `__new__` is not overridden, 

1059 # otherwise whatever `__new__` return type is. 

1060 overridden_return_type = _eval_return_type(func.__new__, node, context) 

1061 if overridden_return_type is not NOT_EVALUATED: 

1062 return overridden_return_type 

1063 return _create_duck_for_heap_type(func) 

1064 else: 

1065 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED) 

1066 return_type = _eval_return_type(func, node, context) 

1067 if getattr(func, "__is_async__", False): 

1068 awaited_type = ( 

1069 inferred_return if inferred_return is not None else return_type 

1070 ) 

1071 coroutine_duck = _Duck(attributes=_get_coroutine_attributes()) 

1072 coroutine_duck.__awaited_type__ = awaited_type 

1073 return coroutine_duck 

1074 if inferred_return is not NOT_EVALUATED: 

1075 return inferred_return 

1076 if return_type is not NOT_EVALUATED: 

1077 return return_type 

1078 raise GuardRejection( 

1079 "Call for", 

1080 func, # not joined to avoid calling `repr` 

1081 f"not allowed in {context.evaluation} mode", 

1082 ) 

1083 if isinstance(node, ast.Assert): 

1084 # message is always the second item, so if it is defined user would be completing 

1085 # on the message, not on the assertion test 

1086 if node.msg: 

1087 return eval_node(node.msg, context) 

1088 return eval_node(node.test, context) 

1089 return None 

1090 

1091 

1092def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy): 

1093 """Merge multiple dictionaries, combining values for each key.""" 

1094 if len(dicts) == 1: 

1095 return dicts[0] 

1096 

1097 all_keys = set() 

1098 for d in dicts: 

1099 all_keys.update(d.keys()) 

1100 

1101 merged = {} 

1102 for key in all_keys: 

1103 values = [d[key] for d in dicts if key in d] 

1104 if values: 

1105 merged[key] = _merge_values(values, policy) 

1106 

1107 return merged 

1108 

1109 

1110def _merge_values(values, policy: EvaluationPolicy): 

1111 """Recursively merge multiple values, combining attributes and dict items.""" 

1112 if len(values) == 1: 

1113 return values[0] 

1114 

1115 types = {type(v) for v in values} 

1116 merged_items = None 

1117 key_values = {} 

1118 attributes = set() 

1119 for v in values: 

1120 if policy.can_call(v.__dir__): 

1121 attributes.update(dir(v)) 

1122 try: 

1123 if policy.can_call(v.items): 

1124 try: 

1125 for k, val in v.items(): 

1126 key_values.setdefault(k, []).append(val) 

1127 except Exception as e: 

1128 pass 

1129 elif policy.can_call(v.keys): 

1130 try: 

1131 for k in v.keys(): 

1132 key_values.setdefault(k, []).append(None) 

1133 except Exception as e: 

1134 pass 

1135 except Exception as e: 

1136 pass 

1137 

1138 if key_values: 

1139 merged_items = { 

1140 k: _merge_values(vals, policy) if vals[0] is not None else None 

1141 for k, vals in key_values.items() 

1142 } 

1143 

1144 if len(types) == 1: 

1145 t = next(iter(types)) 

1146 if t not in (dict,) and not ( 

1147 hasattr(next(iter(values)), "__getitem__") 

1148 and ( 

1149 hasattr(next(iter(values)), "items") 

1150 or hasattr(next(iter(values)), "keys") 

1151 ) 

1152 ): 

1153 if t in (list, set, tuple): 

1154 return t 

1155 return values[0] 

1156 

1157 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items) 

1158 

1159 

1160def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext): 

1161 """Infer the return value(s) of a function by evaluating all return statements.""" 

1162 return_values = _collect_return_values(node.body, context) 

1163 

1164 if not return_values: 

1165 return None 

1166 if len(return_values) == 1: 

1167 return return_values[0] 

1168 

1169 policy = get_policy(context) 

1170 return _merge_values(return_values, policy) 

1171 

1172 

1173def _collect_return_values(body, context): 

1174 """Recursively collect return values from a list of AST statements.""" 

1175 return_values = [] 

1176 for stmt in body: 

1177 if isinstance(stmt, ast.Return): 

1178 if stmt.value is None: 

1179 continue 

1180 try: 

1181 value = eval_node(stmt.value, context) 

1182 if value is not None and value is not NOT_EVALUATED: 

1183 return_values.append(value) 

1184 except Exception: 

1185 pass 

1186 if isinstance( 

1187 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda) 

1188 ): 

1189 continue 

1190 elif hasattr(stmt, "body") and isinstance(stmt.body, list): 

1191 return_values.extend(_collect_return_values(stmt.body, context)) 

1192 if isinstance(stmt, ast.Try): 

1193 for h in stmt.handlers: 

1194 if hasattr(h, "body"): 

1195 return_values.extend(_collect_return_values(h.body, context)) 

1196 if hasattr(stmt, "orelse"): 

1197 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1198 if hasattr(stmt, "finalbody"): 

1199 return_values.extend(_collect_return_values(stmt.finalbody, context)) 

1200 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list): 

1201 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1202 return return_values 

1203 

1204 

1205def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): 

1206 """Evaluate return type of a given callable function. 

1207 

1208 Returns the built-in type, a duck or NOT_EVALUATED sentinel. 

1209 """ 

1210 try: 

1211 sig = signature(func) 

1212 except ValueError: 

1213 sig = UNKNOWN_SIGNATURE 

1214 # if annotation was not stringized, or it was stringized 

1215 # but resolved by signature call we know the return type 

1216 not_empty = sig.return_annotation is not Signature.empty 

1217 if not_empty: 

1218 return _resolve_annotation(sig.return_annotation, context, sig, func, node) 

1219 return NOT_EVALUATED 

1220 

1221 

1222def _eval_annotation( 

1223 annotation: str, 

1224 context: EvaluationContext, 

1225): 

1226 return ( 

1227 _eval_node_name(annotation, context) 

1228 if isinstance(annotation, str) 

1229 else annotation 

1230 ) 

1231 

1232 

1233class _GetItemDuck(dict): 

1234 """A dict subclass that always returns the factory instance and claims to have any item.""" 

1235 

1236 def __init__(self, factory, *args, **kwargs): 

1237 super().__init__(*args, **kwargs) 

1238 self._factory = factory 

1239 

1240 def __getitem__(self, key): 

1241 return self._factory() 

1242 

1243 def __contains__(self, key): 

1244 return True 

1245 

1246 

1247def _resolve_annotation( 

1248 annotation: object | str, 

1249 context: EvaluationContext, 

1250 sig: Signature | None = None, 

1251 func: Callable | None = None, 

1252 node: ast.Call | None = None, 

1253): 

1254 """Resolve annotation created by user with `typing` module and custom objects.""" 

1255 if annotation is None: 

1256 return None 

1257 annotation = _eval_annotation(annotation, context) 

1258 origin = get_origin(annotation) 

1259 if annotation is Self and func and hasattr(func, "__self__"): 

1260 return func.__self__ 

1261 elif origin is Literal: 

1262 type_args = get_args(annotation) 

1263 if len(type_args) == 1: 

1264 return type_args[0] 

1265 elif annotation is LiteralString: 

1266 return "" 

1267 elif annotation is AnyStr: 

1268 index = None 

1269 if func and hasattr(func, "__node__"): 

1270 def_node = func.__node__ 

1271 for i, arg in enumerate(def_node.args.args): 

1272 if not arg.annotation: 

1273 continue 

1274 annotation = _eval_annotation(arg.annotation.id, context) 

1275 if annotation is AnyStr: 

1276 index = i 

1277 break 

1278 is_bound_method = ( 

1279 isinstance(func, MethodType) and getattr(func, "__self__") is not None 

1280 ) 

1281 if index and is_bound_method: 

1282 index -= 1 

1283 elif sig: 

1284 for i, (key, value) in enumerate(sig.parameters.items()): 

1285 if value.annotation is AnyStr: 

1286 index = i 

1287 break 

1288 if index is None: 

1289 return None 

1290 if index < 0 or index >= len(node.args): 

1291 return None 

1292 return eval_node(node.args[index], context) 

1293 elif origin is TypeGuard: 

1294 return False 

1295 elif origin is set or origin is list: 

1296 # only one type argument allowed 

1297 attributes = [ 

1298 attr 

1299 for attr in dir( 

1300 _resolve_annotation(get_args(annotation)[0], context, sig, func, node) 

1301 ) 

1302 ] 

1303 duck = _Duck(attributes=dict.fromkeys(attributes)) 

1304 return _Duck( 

1305 attributes=dict.fromkeys(dir(origin())), 

1306 # items are not strrictly needed for set 

1307 items=_GetItemDuck(lambda: duck), 

1308 ) 

1309 elif origin is tuple: 

1310 # multiple type arguments 

1311 return tuple( 

1312 _resolve_annotation(arg, context, sig, func, node) 

1313 for arg in get_args(annotation) 

1314 ) 

1315 elif origin is Union: 

1316 # multiple type arguments 

1317 attributes = [ 

1318 attr 

1319 for type_arg in get_args(annotation) 

1320 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node)) 

1321 ] 

1322 return _Duck(attributes=dict.fromkeys(attributes)) 

1323 elif is_typeddict(annotation): 

1324 return _Duck( 

1325 attributes=dict.fromkeys(dir(dict())), 

1326 items={ 

1327 k: _resolve_annotation(v, context, sig, func, node) 

1328 for k, v in annotation.__annotations__.items() 

1329 }, 

1330 ) 

1331 elif hasattr(annotation, "_is_protocol"): 

1332 return _Duck(attributes=dict.fromkeys(dir(annotation))) 

1333 elif origin is Annotated: 

1334 type_arg = get_args(annotation)[0] 

1335 return _resolve_annotation(type_arg, context, sig, func, node) 

1336 elif isinstance(annotation, NewType): 

1337 return _eval_or_create_duck(annotation.__supertype__, context) 

1338 elif isinstance(annotation, TypeAliasType): 

1339 return _eval_or_create_duck(annotation.__value__, context) 

1340 else: 

1341 return _eval_or_create_duck(annotation, context) 

1342 

1343 

1344def _eval_node_name(node_id: str, context: EvaluationContext): 

1345 policy = get_policy(context) 

1346 if node_id in context.transient_locals: 

1347 return context.transient_locals[node_id] 

1348 if policy.allow_locals_access and node_id in context.locals: 

1349 return context.locals[node_id] 

1350 if policy.allow_globals_access and node_id in context.globals: 

1351 return context.globals[node_id] 

1352 if policy.allow_builtins_access and hasattr(builtins, node_id): 

1353 # note: do not use __builtins__, it is implementation detail of cPython 

1354 return getattr(builtins, node_id) 

1355 if policy.allow_auto_import and context.auto_import: 

1356 return context.auto_import(node_id) 

1357 if not policy.allow_globals_access and not policy.allow_locals_access: 

1358 raise GuardRejection( 

1359 f"Namespace access not allowed in {context.evaluation} mode" 

1360 ) 

1361 else: 

1362 raise NameError(f"{node_id} not found in locals, globals, nor builtins") 

1363 

1364 

1365def _eval_or_create_duck(duck_type, context: EvaluationContext): 

1366 policy = get_policy(context) 

1367 # if allow-listed builtin is on type annotation, instantiate it 

1368 if policy.can_call(duck_type): 

1369 return duck_type() 

1370 # if custom class is in type annotation, mock it 

1371 return _create_duck_for_heap_type(duck_type) 

1372 

1373 

1374def _create_duck_for_heap_type(duck_type): 

1375 """Create an imitation of an object of a given type (a duck). 

1376 

1377 Returns the duck or NOT_EVALUATED sentinel if duck could not be created. 

1378 """ 

1379 duck = ImpersonatingDuck() 

1380 try: 

1381 # this only works for heap types, not builtins 

1382 duck.__class__ = duck_type 

1383 return duck 

1384 except TypeError: 

1385 pass 

1386 return NOT_EVALUATED 

1387 

1388 

1389SUPPORTED_EXTERNAL_GETITEM = { 

1390 ("pandas", "core", "indexing", "_iLocIndexer"), 

1391 ("pandas", "core", "indexing", "_LocIndexer"), 

1392 ("pandas", "DataFrame"), 

1393 ("pandas", "Series"), 

1394 ("numpy", "ndarray"), 

1395 ("numpy", "void"), 

1396} 

1397 

1398 

1399BUILTIN_GETITEM: set[InstancesHaveGetItem] = { 

1400 dict, 

1401 str, # type: ignore[arg-type] 

1402 bytes, # type: ignore[arg-type] 

1403 list, 

1404 tuple, 

1405 type, # for type annotations like list[str] 

1406 _Duck, 

1407 collections.defaultdict, 

1408 collections.deque, 

1409 collections.OrderedDict, 

1410 collections.ChainMap, 

1411 collections.UserDict, 

1412 collections.UserList, 

1413 collections.UserString, # type: ignore[arg-type] 

1414 _DummyNamedTuple, 

1415 _IdentitySubscript, 

1416} 

1417 

1418 

1419def _list_methods(cls, source=None): 

1420 """For use on immutable objects or with methods returning a copy""" 

1421 return [getattr(cls, k) for k in (source if source else dir(cls))] 

1422 

1423 

1424dict_non_mutating_methods = ("copy", "keys", "values", "items") 

1425list_non_mutating_methods = ("copy", "index", "count") 

1426set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) 

1427 

1428 

1429dict_keys: type[collections.abc.KeysView] = type({}.keys()) 

1430dict_values: type = type({}.values()) 

1431dict_items: type = type({}.items()) 

1432 

1433NUMERICS = {int, float, complex} 

1434 

1435ALLOWED_CALLS = { 

1436 bytes, 

1437 *_list_methods(bytes), 

1438 bytes.__iter__, 

1439 dict, 

1440 *_list_methods(dict, dict_non_mutating_methods), 

1441 dict.__iter__, 

1442 dict_keys.__iter__, 

1443 dict_values.__iter__, 

1444 dict_items.__iter__, 

1445 dict_keys.isdisjoint, 

1446 list, 

1447 *_list_methods(list, list_non_mutating_methods), 

1448 list.__iter__, 

1449 set, 

1450 *_list_methods(set, set_non_mutating_methods), 

1451 set.__iter__, 

1452 frozenset, 

1453 *_list_methods(frozenset), 

1454 frozenset.__iter__, 

1455 range, 

1456 range.__iter__, 

1457 str, 

1458 *_list_methods(str), 

1459 str.__iter__, 

1460 tuple, 

1461 *_list_methods(tuple), 

1462 tuple.__iter__, 

1463 bool, 

1464 *_list_methods(bool), 

1465 *NUMERICS, 

1466 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], 

1467 collections.deque, 

1468 *_list_methods(collections.deque, list_non_mutating_methods), 

1469 collections.deque.__iter__, 

1470 collections.defaultdict, 

1471 *_list_methods(collections.defaultdict, dict_non_mutating_methods), 

1472 collections.defaultdict.__iter__, 

1473 collections.OrderedDict, 

1474 *_list_methods(collections.OrderedDict, dict_non_mutating_methods), 

1475 collections.OrderedDict.__iter__, 

1476 collections.UserDict, 

1477 *_list_methods(collections.UserDict, dict_non_mutating_methods), 

1478 collections.UserDict.__iter__, 

1479 collections.UserList, 

1480 *_list_methods(collections.UserList, list_non_mutating_methods), 

1481 collections.UserList.__iter__, 

1482 collections.UserString, 

1483 *_list_methods(collections.UserString, dir(str)), 

1484 collections.UserString.__iter__, 

1485 collections.Counter, 

1486 *_list_methods(collections.Counter, dict_non_mutating_methods), 

1487 collections.Counter.__iter__, 

1488 collections.Counter.elements, 

1489 collections.Counter.most_common, 

1490 object.__dir__, 

1491 type.__dir__, 

1492 _Duck.__dir__, 

1493} 

1494 

1495BUILTIN_GETATTR: set[MayHaveGetattr] = { 

1496 *BUILTIN_GETITEM, 

1497 set, 

1498 frozenset, 

1499 object, 

1500 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. 

1501 *NUMERICS, 

1502 dict_keys, 

1503 MethodDescriptorType, 

1504 ModuleType, 

1505} 

1506 

1507 

1508BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} 

1509 

1510EVALUATION_POLICIES = { 

1511 "minimal": EvaluationPolicy( 

1512 allow_builtins_access=True, 

1513 allow_locals_access=False, 

1514 allow_globals_access=False, 

1515 allow_item_access=False, 

1516 allow_attr_access=False, 

1517 allowed_calls=set(), 

1518 allow_any_calls=False, 

1519 allow_all_operations=False, 

1520 ), 

1521 "limited": SelectivePolicy( 

1522 allowed_getitem=BUILTIN_GETITEM, 

1523 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, 

1524 allowed_getattr=BUILTIN_GETATTR, 

1525 allowed_getattr_external={ 

1526 # pandas Series/Frame implements custom `__getattr__` 

1527 ("pandas", "DataFrame"), 

1528 ("pandas", "Series"), 

1529 }, 

1530 allowed_operations=BUILTIN_OPERATIONS, 

1531 allow_builtins_access=True, 

1532 allow_locals_access=True, 

1533 allow_globals_access=True, 

1534 allow_getitem_on_types=True, 

1535 allowed_calls=ALLOWED_CALLS, 

1536 ), 

1537 "unsafe": EvaluationPolicy( 

1538 allow_builtins_access=True, 

1539 allow_locals_access=True, 

1540 allow_globals_access=True, 

1541 allow_attr_access=True, 

1542 allow_item_access=True, 

1543 allow_any_calls=True, 

1544 allow_all_operations=True, 

1545 ), 

1546} 

1547 

1548 

1549__all__ = [ 

1550 "guarded_eval", 

1551 "eval_node", 

1552 "GuardRejection", 

1553 "EvaluationContext", 

1554 "_unbind_method", 

1555]