Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/guarded_eval.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

809 statements  

1from copy import copy 

2from inspect import isclass, signature, Signature, getmodule 

3from typing import ( 

4 Annotated, 

5 AnyStr, 

6 Callable, 

7 Literal, 

8 NamedTuple, 

9 NewType, 

10 Optional, 

11 Protocol, 

12 Sequence, 

13 TypeGuard, 

14 Union, 

15 get_args, 

16 get_origin, 

17 is_typeddict, 

18) 

19import ast 

20import builtins 

21import collections 

22import dataclasses 

23import operator 

24import sys 

25import typing 

26import warnings 

27from functools import cached_property 

28from dataclasses import dataclass, field 

29from types import MethodDescriptorType, ModuleType, MethodType 

30 

31from IPython.utils.decorators import undoc 

32 

33 

34from typing import Self, LiteralString 

35 

36if sys.version_info < (3, 12): 

37 from typing_extensions import TypeAliasType 

38else: 

39 from typing import TypeAliasType 

40 

41 

42@undoc 

43class HasGetItem(Protocol): 

44 def __getitem__(self, key) -> None: ... 

45 

46 

47@undoc 

48class InstancesHaveGetItem(Protocol): 

49 def __call__(self, *args, **kwargs) -> HasGetItem: ... 

50 

51 

52@undoc 

53class HasGetAttr(Protocol): 

54 def __getattr__(self, key) -> None: ... 

55 

56 

57@undoc 

58class DoesNotHaveGetAttr(Protocol): 

59 pass 

60 

61 

62# By default `__getattr__` is not explicitly implemented on most objects 

63MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] 

64 

65 

66def _unbind_method(func: Callable) -> Union[Callable, None]: 

67 """Get unbound method for given bound method. 

68 

69 Returns None if cannot get unbound method, or method is already unbound. 

70 """ 

71 owner = getattr(func, "__self__", None) 

72 owner_class = type(owner) 

73 name = getattr(func, "__name__", None) 

74 instance_dict_overrides = getattr(owner, "__dict__", None) 

75 if ( 

76 owner is not None 

77 and name 

78 and ( 

79 not instance_dict_overrides 

80 or (instance_dict_overrides and name not in instance_dict_overrides) 

81 ) 

82 ): 

83 return getattr(owner_class, name) 

84 return None 

85 

86 

87@undoc 

88@dataclass 

89class EvaluationPolicy: 

90 """Definition of evaluation policy.""" 

91 

92 allow_locals_access: bool = False 

93 allow_globals_access: bool = False 

94 allow_item_access: bool = False 

95 allow_attr_access: bool = False 

96 allow_builtins_access: bool = False 

97 allow_all_operations: bool = False 

98 allow_any_calls: bool = False 

99 allow_auto_import: bool = False 

100 allowed_calls: set[Callable] = field(default_factory=set) 

101 

102 def can_get_item(self, value, item): 

103 return self.allow_item_access 

104 

105 def can_get_attr(self, value, attr): 

106 return self.allow_attr_access 

107 

108 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

109 if self.allow_all_operations: 

110 return True 

111 

112 def can_call(self, func): 

113 if self.allow_any_calls: 

114 return True 

115 

116 if func in self.allowed_calls: 

117 return True 

118 

119 owner_method = _unbind_method(func) 

120 

121 if owner_method and owner_method in self.allowed_calls: 

122 return True 

123 

124 

125def _get_external(module_name: str, access_path: Sequence[str]): 

126 """Get value from external module given a dotted access path. 

127 

128 Only gets value if the module is already imported. 

129 

130 Raises: 

131 * `KeyError` if module is removed not found, and 

132 * `AttributeError` if access path does not match an exported object 

133 """ 

134 try: 

135 member_type = sys.modules[module_name] 

136 # standard module 

137 for attr in access_path: 

138 member_type = getattr(member_type, attr) 

139 return member_type 

140 except (KeyError, AttributeError): 

141 # handle modules in namespace packages 

142 module_path = ".".join([module_name, *access_path]) 

143 if module_path in sys.modules: 

144 return sys.modules[module_path] 

145 raise 

146 

147 

148def _has_original_dunder_external( 

149 value, 

150 module_name: str, 

151 access_path: Sequence[str], 

152 method_name: str, 

153): 

154 if module_name not in sys.modules: 

155 full_module_path = ".".join([module_name, *access_path]) 

156 if full_module_path not in sys.modules: 

157 # LBYLB as it is faster 

158 return False 

159 try: 

160 member_type = _get_external(module_name, access_path) 

161 value_type = type(value) 

162 if type(value) == member_type: 

163 return True 

164 if isinstance(member_type, ModuleType): 

165 value_module = getmodule(value_type) 

166 if not value_module or not value_module.__name__: 

167 return False 

168 if ( 

169 value_module.__name__ == member_type.__name__ 

170 or value_module.__name__.startswith(member_type.__name__ + ".") 

171 ): 

172 return True 

173 if method_name == "__getattribute__": 

174 # we have to short-circuit here due to an unresolved issue in 

175 # `isinstance` implementation: https://bugs.python.org/issue32683 

176 return False 

177 if not isinstance(member_type, ModuleType) and isinstance(value, member_type): 

178 method = getattr(value_type, method_name, None) 

179 member_method = getattr(member_type, method_name, None) 

180 if member_method == method: 

181 return True 

182 if isinstance(member_type, ModuleType): 

183 method = getattr(value_type, method_name, None) 

184 for base_class in value_type.__mro__[1:]: 

185 base_module = getmodule(base_class) 

186 if base_module and ( 

187 base_module.__name__ == member_type.__name__ 

188 or base_module.__name__.startswith(member_type.__name__ + ".") 

189 ): 

190 # Check if the method comes from this trusted base class 

191 base_method = getattr(base_class, method_name, None) 

192 if base_method is not None and base_method == method: 

193 return True 

194 except (AttributeError, KeyError): 

195 return False 

196 

197 

198def _has_original_dunder( 

199 value, allowed_types, allowed_methods, allowed_external, method_name 

200): 

201 # note: Python ignores `__getattr__`/`__getitem__` on instances, 

202 # we only need to check at class level 

203 value_type = type(value) 

204 

205 # strict type check passes → no need to check method 

206 if value_type in allowed_types: 

207 return True 

208 

209 method = getattr(value_type, method_name, None) 

210 

211 if method is None: 

212 return None 

213 

214 if method in allowed_methods: 

215 return True 

216 

217 for module_name, *access_path in allowed_external: 

218 if _has_original_dunder_external(value, module_name, access_path, method_name): 

219 return True 

220 

221 return False 

222 

223 

224def _coerce_path_to_tuples( 

225 allow_list: set[tuple[str, ...] | str], 

226) -> set[tuple[str, ...]]: 

227 """Replace dotted paths on the provided allow-list with tuples.""" 

228 return { 

229 path if isinstance(path, tuple) else tuple(path.split(".")) 

230 for path in allow_list 

231 } 

232 

233 

234@undoc 

235@dataclass 

236class SelectivePolicy(EvaluationPolicy): 

237 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set) 

238 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set) 

239 

240 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set) 

241 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set) 

242 

243 allowed_operations: set = field(default_factory=set) 

244 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set) 

245 

246 allow_getitem_on_types: bool = field(default_factory=bool) 

247 

248 _operation_methods_cache: dict[str, set[Callable]] = field( 

249 default_factory=dict, init=False 

250 ) 

251 

252 def can_get_attr(self, value, attr): 

253 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external) 

254 

255 has_original_attribute = _has_original_dunder( 

256 value, 

257 allowed_types=self.allowed_getattr, 

258 allowed_methods=self._getattribute_methods, 

259 allowed_external=allowed_getattr_external, 

260 method_name="__getattribute__", 

261 ) 

262 has_original_attr = _has_original_dunder( 

263 value, 

264 allowed_types=self.allowed_getattr, 

265 allowed_methods=self._getattr_methods, 

266 allowed_external=allowed_getattr_external, 

267 method_name="__getattr__", 

268 ) 

269 

270 accept = False 

271 

272 # Many objects do not have `__getattr__`, this is fine. 

273 if has_original_attr is None and has_original_attribute: 

274 accept = True 

275 else: 

276 # Accept objects without modifications to `__getattr__` and `__getattribute__` 

277 accept = has_original_attr and has_original_attribute 

278 

279 if accept: 

280 # We still need to check for overridden properties. 

281 

282 value_class = type(value) 

283 if not hasattr(value_class, attr): 

284 return True 

285 

286 class_attr_val = getattr(value_class, attr) 

287 is_property = isinstance(class_attr_val, property) 

288 

289 if not is_property: 

290 return True 

291 

292 # Properties in allowed types are ok (although we do not include any 

293 # properties in our default allow list currently). 

294 if type(value) in self.allowed_getattr: 

295 return True # pragma: no cover 

296 

297 # Properties in subclasses of allowed types may be ok if not changed 

298 for module_name, *access_path in allowed_getattr_external: 

299 try: 

300 external_class = _get_external(module_name, access_path) 

301 external_class_attr_val = getattr(external_class, attr) 

302 except (KeyError, AttributeError): 

303 return False # pragma: no cover 

304 return class_attr_val == external_class_attr_val 

305 

306 return False 

307 

308 def can_get_item(self, value, item): 

309 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" 

310 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external) 

311 if self.allow_getitem_on_types: 

312 # e.g. Union[str, int] or Literal[True, 1] 

313 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)): 

314 return True 

315 # PEP 560 e.g. list[str] 

316 if isinstance(value, type) and hasattr(value, "__class_getitem__"): 

317 return True 

318 return _has_original_dunder( 

319 value, 

320 allowed_types=self.allowed_getitem, 

321 allowed_methods=self._getitem_methods, 

322 allowed_external=allowed_getitem_external, 

323 method_name="__getitem__", 

324 ) 

325 

326 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

327 allowed_operations_external = _coerce_path_to_tuples( 

328 self.allowed_operations_external 

329 ) 

330 objects = [a] 

331 if b is not None: 

332 objects.append(b) 

333 return all( 

334 [ 

335 _has_original_dunder( 

336 obj, 

337 allowed_types=self.allowed_operations, 

338 allowed_methods=self._operator_dunder_methods(dunder), 

339 allowed_external=allowed_operations_external, 

340 method_name=dunder, 

341 ) 

342 for dunder in dunders 

343 for obj in objects 

344 ] 

345 ) 

346 

347 def _operator_dunder_methods(self, dunder: str) -> set[Callable]: 

348 if dunder not in self._operation_methods_cache: 

349 self._operation_methods_cache[dunder] = self._safe_get_methods( 

350 self.allowed_operations, dunder 

351 ) 

352 return self._operation_methods_cache[dunder] 

353 

354 @cached_property 

355 def _getitem_methods(self) -> set[Callable]: 

356 return self._safe_get_methods(self.allowed_getitem, "__getitem__") 

357 

358 @cached_property 

359 def _getattr_methods(self) -> set[Callable]: 

360 return self._safe_get_methods(self.allowed_getattr, "__getattr__") 

361 

362 @cached_property 

363 def _getattribute_methods(self) -> set[Callable]: 

364 return self._safe_get_methods(self.allowed_getattr, "__getattribute__") 

365 

366 def _safe_get_methods(self, classes, name) -> set[Callable]: 

367 return { 

368 method 

369 for class_ in classes 

370 for method in [getattr(class_, name, None)] 

371 if method 

372 } 

373 

374 

375class _DummyNamedTuple(NamedTuple): 

376 """Used internally to retrieve methods of named tuple instance.""" 

377 

378 

379EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] 

380 

381 

382@dataclass 

383class EvaluationContext: 

384 #: Local namespace 

385 locals: dict 

386 #: Global namespace 

387 globals: dict 

388 #: Evaluation policy identifier 

389 evaluation: EvaluationPolicyName = "forbidden" 

390 #: Whether the evaluation of code takes place inside of a subscript. 

391 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. 

392 in_subscript: bool = False 

393 #: Auto import method 

394 auto_import: Callable[list[str], ModuleType] | None = None 

395 #: Overrides for evaluation policy 

396 policy_overrides: dict = field(default_factory=dict) 

397 #: Transient local namespace used to store mocks 

398 transient_locals: dict = field(default_factory=dict) 

399 #: Transients of class level 

400 class_transients: dict | None = None 

401 #: Instance variable name used in the method definition 

402 instance_arg_name: str | None = None 

403 

404 def replace(self, /, **changes): 

405 """Return a new copy of the context, with specified changes""" 

406 return dataclasses.replace(self, **changes) 

407 

408 

409class _IdentitySubscript: 

410 """Returns the key itself when item is requested via subscript.""" 

411 

412 def __getitem__(self, key): 

413 return key 

414 

415 

416IDENTITY_SUBSCRIPT = _IdentitySubscript() 

417SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" 

418UNKNOWN_SIGNATURE = Signature() 

419NOT_EVALUATED = object() 

420 

421 

422class GuardRejection(Exception): 

423 """Exception raised when guard rejects evaluation attempt.""" 

424 

425 pass 

426 

427 

428def guarded_eval(code: str, context: EvaluationContext): 

429 """Evaluate provided code in the evaluation context. 

430 

431 If evaluation policy given by context is set to ``forbidden`` 

432 no evaluation will be performed; if it is set to ``dangerous`` 

433 standard :func:`eval` will be used; finally, for any other, 

434 policy :func:`eval_node` will be called on parsed AST. 

435 """ 

436 locals_ = context.locals 

437 

438 if context.evaluation == "forbidden": 

439 raise GuardRejection("Forbidden mode") 

440 

441 # note: not using `ast.literal_eval` as it does not implement 

442 # getitem at all, for example it fails on simple `[0][1]` 

443 

444 if context.in_subscript: 

445 # syntactic sugar for ellipsis (:) is only available in subscripts 

446 # so we need to trick the ast parser into thinking that we have 

447 # a subscript, but we need to be able to later recognise that we did 

448 # it so we can ignore the actual __getitem__ operation 

449 if not code: 

450 return tuple() 

451 locals_ = locals_.copy() 

452 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT 

453 code = SUBSCRIPT_MARKER + "[" + code + "]" 

454 context = context.replace(locals=locals_) 

455 

456 if context.evaluation == "dangerous": 

457 return eval(code, context.globals, context.locals) 

458 

459 node = ast.parse(code, mode="exec") 

460 

461 return eval_node(node, context) 

462 

463 

464BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = { 

465 ast.Add: ("__add__",), 

466 ast.Sub: ("__sub__",), 

467 ast.Mult: ("__mul__",), 

468 ast.Div: ("__truediv__",), 

469 ast.FloorDiv: ("__floordiv__",), 

470 ast.Mod: ("__mod__",), 

471 ast.Pow: ("__pow__",), 

472 ast.LShift: ("__lshift__",), 

473 ast.RShift: ("__rshift__",), 

474 ast.BitOr: ("__or__",), 

475 ast.BitXor: ("__xor__",), 

476 ast.BitAnd: ("__and__",), 

477 ast.MatMult: ("__matmul__",), 

478} 

479 

480COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = { 

481 ast.Eq: ("__eq__",), 

482 ast.NotEq: ("__ne__", "__eq__"), 

483 ast.Lt: ("__lt__", "__gt__"), 

484 ast.LtE: ("__le__", "__ge__"), 

485 ast.Gt: ("__gt__", "__lt__"), 

486 ast.GtE: ("__ge__", "__le__"), 

487 ast.In: ("__contains__",), 

488 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially 

489} 

490 

491UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = { 

492 ast.USub: ("__neg__",), 

493 ast.UAdd: ("__pos__",), 

494 # we have to check both __inv__ and __invert__! 

495 ast.Invert: ("__invert__", "__inv__"), 

496 ast.Not: ("__not__",), 

497} 

498 

499 

500class ImpersonatingDuck: 

501 """A dummy class used to create objects of other classes without calling their ``__init__``""" 

502 

503 # no-op: override __class__ to impersonate 

504 

505 

506class _Duck: 

507 """A dummy class used to create objects pretending to have given attributes""" 

508 

509 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): 

510 self.attributes = attributes if attributes is not None else {} 

511 self.items = items if items is not None else {} 

512 

513 def __getattr__(self, attr: str): 

514 return self.attributes[attr] 

515 

516 def __hasattr__(self, attr: str): 

517 return attr in self.attributes 

518 

519 def __dir__(self): 

520 return [*dir(super), *self.attributes] 

521 

522 def __getitem__(self, key: str): 

523 return self.items[key] 

524 

525 def __hasitem__(self, key: str): 

526 return self.items[key] 

527 

528 def _ipython_key_completions_(self): 

529 return self.items.keys() 

530 

531 

532def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]: 

533 dunder = None 

534 for op, candidate_dunder in dunders.items(): 

535 if isinstance(node_op, op): 

536 dunder = candidate_dunder 

537 return dunder 

538 

539 

540def get_policy(context: EvaluationContext) -> EvaluationPolicy: 

541 policy = copy(EVALUATION_POLICIES[context.evaluation]) 

542 

543 for key, value in context.policy_overrides.items(): 

544 if hasattr(policy, key): 

545 setattr(policy, key, value) 

546 return policy 

547 

548 

549def _validate_policy_overrides( 

550 policy_name: EvaluationPolicyName, policy_overrides: dict 

551) -> bool: 

552 policy = EVALUATION_POLICIES[policy_name] 

553 

554 all_good = True 

555 for key, value in policy_overrides.items(): 

556 if not hasattr(policy, key): 

557 warnings.warn( 

558 f"Override {key!r} is not valid with {policy_name!r} evaluation policy" 

559 ) 

560 all_good = False 

561 return all_good 

562 

563 

564def _handle_assign(node: ast.Assign, context: EvaluationContext): 

565 value = eval_node(node.value, context) 

566 transient_locals = context.transient_locals 

567 policy = get_policy(context) 

568 class_transients = context.class_transients 

569 for target in node.targets: 

570 if isinstance(target, (ast.Tuple, ast.List)): 

571 # Handle unpacking assignment 

572 values = list(value) 

573 targets = target.elts 

574 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)] 

575 

576 # Unified handling: treat no starred as starred at end 

577 star_or_last_idx = starred[0] if starred else len(targets) 

578 

579 # Before starred 

580 for i in range(star_or_last_idx): 

581 # Check for self.x assignment 

582 if _is_instance_attribute_assignment(targets[i], context): 

583 class_transients[targets[i].attr] = values[i] 

584 else: 

585 transient_locals[targets[i].id] = values[i] 

586 

587 # Starred if exists 

588 if starred: 

589 end = len(values) - (len(targets) - star_or_last_idx - 1) 

590 if _is_instance_attribute_assignment( 

591 targets[star_or_last_idx], context 

592 ): 

593 class_transients[targets[star_or_last_idx].attr] = values[ 

594 star_or_last_idx:end 

595 ] 

596 else: 

597 transient_locals[targets[star_or_last_idx].value.id] = values[ 

598 star_or_last_idx:end 

599 ] 

600 

601 # After starred 

602 for i in range(star_or_last_idx + 1, len(targets)): 

603 if _is_instance_attribute_assignment(targets[i], context): 

604 class_transients[targets[i].attr] = values[ 

605 len(values) - (len(targets) - i) 

606 ] 

607 else: 

608 transient_locals[targets[i].id] = values[ 

609 len(values) - (len(targets) - i) 

610 ] 

611 elif isinstance(target, ast.Subscript): 

612 if isinstance(target.value, ast.Name): 

613 name = target.value.id 

614 container = transient_locals.get(name) 

615 if container is None: 

616 container = context.locals.get(name) 

617 if container is None: 

618 container = context.globals.get(name) 

619 if container is None: 

620 raise NameError( 

621 f"{name} not found in locals, globals, nor builtins" 

622 ) 

623 storage_dict = transient_locals 

624 storage_key = name 

625 elif isinstance( 

626 target.value, ast.Attribute 

627 ) and _is_instance_attribute_assignment(target.value, context): 

628 attr = target.value.attr 

629 container = class_transients.get(attr, None) 

630 if container is None: 

631 raise NameError(f"{attr} not found in class transients") 

632 storage_dict = class_transients 

633 storage_key = attr 

634 else: 

635 return 

636 

637 key = eval_node(target.slice, context) 

638 attributes = ( 

639 dict.fromkeys(dir(container)) 

640 if policy.can_call(container.__dir__) 

641 else {} 

642 ) 

643 items = {} 

644 

645 if policy.can_get_item(container, None): 

646 try: 

647 items = dict(container.items()) 

648 except Exception: 

649 pass 

650 

651 items[key] = value 

652 duck_container = _Duck(attributes=attributes, items=items) 

653 storage_dict[storage_key] = duck_container 

654 elif _is_instance_attribute_assignment(target, context): 

655 class_transients[target.attr] = value 

656 else: 

657 transient_locals[target.id] = value 

658 return None 

659 

660 

661def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext): 

662 args = [eval_node(arg, context) for arg in node.args] 

663 kwargs = { 

664 k: v 

665 for kw in node.keywords 

666 for k, v in ( 

667 {kw.arg: eval_node(kw.value, context)} 

668 if kw.arg 

669 else eval_node(kw.value, context) 

670 ).items() 

671 } 

672 return args, kwargs 

673 

674 

675def _is_instance_attribute_assignment( 

676 target: ast.AST, context: EvaluationContext 

677) -> bool: 

678 """Return True if target is an attribute access on the instance argument.""" 

679 return ( 

680 context.class_transients is not None 

681 and context.instance_arg_name is not None 

682 and isinstance(target, ast.Attribute) 

683 and isinstance(getattr(target, "value", None), ast.Name) 

684 and getattr(target.value, "id", None) == context.instance_arg_name 

685 ) 

686 

687 

688def _get_coroutine_attributes() -> dict[str, Optional[object]]: 

689 async def _dummy(): 

690 return None 

691 

692 coro = _dummy() 

693 try: 

694 return {attr: getattr(coro, attr, None) for attr in dir(coro)} 

695 finally: 

696 coro.close() 

697 

698 

699def eval_node(node: Union[ast.AST, None], context: EvaluationContext): 

700 """Evaluate AST node in provided context. 

701 

702 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. 

703 

704 Does not evaluate actions that always have side effects: 

705 

706 - class definitions (``class sth: ...``) 

707 - function definitions (``def sth: ...``) 

708 - variable assignments (``x = 1``) 

709 - augmented assignments (``x += 1``) 

710 - deletions (``del x``) 

711 

712 Does not evaluate operations which do not return values: 

713 

714 - assertions (``assert x``) 

715 - pass (``pass``) 

716 - imports (``import x``) 

717 - control flow: 

718 

719 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``) 

720 - loops (``for`` and ``while``) 

721 - exception handling 

722 

723 The purpose of this function is to guard against unwanted side-effects; 

724 it does not give guarantees on protection from malicious code execution. 

725 """ 

726 policy = get_policy(context) 

727 

728 if node is None: 

729 return None 

730 if isinstance(node, (ast.Interactive, ast.Module)): 

731 result = None 

732 for child_node in node.body: 

733 result = eval_node(child_node, context) 

734 return result 

735 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

736 is_async = isinstance(node, ast.AsyncFunctionDef) 

737 func_locals = context.transient_locals.copy() 

738 func_context = context.replace(transient_locals=func_locals) 

739 is_property = False 

740 is_static = False 

741 is_classmethod = False 

742 for decorator_node in node.decorator_list: 

743 try: 

744 decorator = eval_node(decorator_node, context) 

745 except NameError: 

746 # if the decorator is not yet defined this is fine 

747 # especialy because we don't handle imports yet 

748 continue 

749 if decorator is property: 

750 is_property = True 

751 elif decorator is staticmethod: 

752 is_static = True 

753 elif decorator is classmethod: 

754 is_classmethod = True 

755 

756 if func_context.class_transients is not None: 

757 if not is_static and not is_classmethod: 

758 func_context.instance_arg_name = ( 

759 node.args.args[0].arg if node.args.args else None 

760 ) 

761 

762 return_type = eval_node(node.returns, context=context) 

763 

764 for child_node in node.body: 

765 eval_node(child_node, func_context) 

766 

767 if is_property: 

768 if return_type is not None: 

769 context.transient_locals[node.name] = _resolve_annotation( 

770 return_type, context 

771 ) 

772 else: 

773 return_value = _infer_return_value(node, func_context) 

774 context.transient_locals[node.name] = return_value 

775 

776 return None 

777 

778 def dummy_function(*args, **kwargs): 

779 pass 

780 

781 if return_type is not None: 

782 dummy_function.__annotations__["return"] = return_type 

783 else: 

784 inferred_return = _infer_return_value(node, func_context) 

785 if inferred_return is not None: 

786 dummy_function.__inferred_return__ = inferred_return 

787 

788 dummy_function.__name__ = node.name 

789 dummy_function.__node__ = node 

790 dummy_function.__is_async__ = is_async 

791 context.transient_locals[node.name] = dummy_function 

792 return None 

793 if isinstance(node, ast.Lambda): 

794 

795 def dummy_function(*args, **kwargs): 

796 pass 

797 

798 dummy_function.__inferred_return__ = eval_node(node.body, context) 

799 return dummy_function 

800 if isinstance(node, ast.ClassDef): 

801 # TODO support class decorators? 

802 class_locals = {} 

803 outer_locals = context.locals.copy() 

804 outer_locals.update(context.transient_locals) 

805 class_context = context.replace( 

806 transient_locals=class_locals, locals=outer_locals 

807 ) 

808 class_context.class_transients = class_locals 

809 for child_node in node.body: 

810 eval_node(child_node, class_context) 

811 bases = tuple([eval_node(base, context) for base in node.bases]) 

812 dummy_class = type(node.name, bases, class_locals) 

813 context.transient_locals[node.name] = dummy_class 

814 return None 

815 if isinstance(node, ast.Await): 

816 value = eval_node(node.value, context) 

817 if hasattr(value, "__awaited_type__"): 

818 return value.__awaited_type__ 

819 return value 

820 if isinstance(node, ast.While): 

821 loop_locals = context.transient_locals.copy() 

822 loop_context = context.replace(transient_locals=loop_locals) 

823 

824 result = None 

825 for stmt in node.body: 

826 result = eval_node(stmt, loop_context) 

827 

828 policy = get_policy(context) 

829 merged_locals = _merge_dicts_by_key( 

830 [loop_locals, context.transient_locals.copy()], policy 

831 ) 

832 context.transient_locals.update(merged_locals) 

833 

834 return result 

835 if isinstance(node, ast.For): 

836 try: 

837 iterable = eval_node(node.iter, context) 

838 except Exception: 

839 iterable = None 

840 

841 sample = None 

842 if iterable is not None: 

843 try: 

844 if policy.can_call(getattr(iterable, "__iter__", None)): 

845 sample = next(iter(iterable)) 

846 except Exception: 

847 sample = None 

848 

849 loop_locals = context.transient_locals.copy() 

850 loop_context = context.replace(transient_locals=loop_locals) 

851 

852 if sample is not None: 

853 try: 

854 fake_assign = ast.Assign( 

855 targets=[node.target], value=ast.Constant(value=sample) 

856 ) 

857 _handle_assign(fake_assign, loop_context) 

858 except Exception: 

859 pass 

860 

861 result = None 

862 for stmt in node.body: 

863 result = eval_node(stmt, loop_context) 

864 

865 policy = get_policy(context) 

866 merged_locals = _merge_dicts_by_key( 

867 [loop_locals, context.transient_locals.copy()], policy 

868 ) 

869 context.transient_locals.update(merged_locals) 

870 

871 return result 

872 if isinstance(node, ast.If): 

873 branches = [] 

874 current = node 

875 result = None 

876 while True: 

877 branch_locals = context.transient_locals.copy() 

878 branch_context = context.replace(transient_locals=branch_locals) 

879 for stmt in current.body: 

880 result = eval_node(stmt, branch_context) 

881 branches.append(branch_locals) 

882 if not current.orelse: 

883 break 

884 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If): 

885 # It's an elif - continue loop 

886 current = current.orelse[0] 

887 else: 

888 # It's an else block - process and break 

889 else_locals = context.transient_locals.copy() 

890 else_context = context.replace(transient_locals=else_locals) 

891 for stmt in current.orelse: 

892 result = eval_node(stmt, else_context) 

893 branches.append(else_locals) 

894 break 

895 branches.append(context.transient_locals.copy()) 

896 policy = get_policy(context) 

897 merged_locals = _merge_dicts_by_key(branches, policy) 

898 context.transient_locals.update(merged_locals) 

899 return result 

900 if isinstance(node, ast.Assign): 

901 return _handle_assign(node, context) 

902 if isinstance(node, ast.AnnAssign): 

903 if node.simple: 

904 value = _resolve_annotation(eval_node(node.annotation, context), context) 

905 context.transient_locals[node.target.id] = value 

906 # Handle non-simple annotated assignments only for self.x: type = value 

907 if _is_instance_attribute_assignment(node.target, context): 

908 value = _resolve_annotation(eval_node(node.annotation, context), context) 

909 context.class_transients[node.target.attr] = value 

910 return None 

911 if isinstance(node, ast.Expression): 

912 return eval_node(node.body, context) 

913 if isinstance(node, ast.Expr): 

914 return eval_node(node.value, context) 

915 if isinstance(node, ast.Pass): 

916 return None 

917 if isinstance(node, ast.Import): 

918 # TODO: populate transient_locals 

919 return None 

920 if isinstance(node, (ast.AugAssign, ast.Delete)): 

921 return None 

922 if isinstance(node, (ast.Global, ast.Nonlocal)): 

923 return None 

924 if isinstance(node, ast.BinOp): 

925 left = eval_node(node.left, context) 

926 right = eval_node(node.right, context) 

927 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) 

928 if dunders: 

929 if policy.can_operate(dunders, left, right): 

930 return getattr(left, dunders[0])(right) 

931 else: 

932 raise GuardRejection( 

933 f"Operation (`{dunders}`) for", 

934 type(left), 

935 f"not allowed in {context.evaluation} mode", 

936 ) 

937 if isinstance(node, ast.Compare): 

938 left = eval_node(node.left, context) 

939 all_true = True 

940 negate = False 

941 for op, right in zip(node.ops, node.comparators): 

942 right = eval_node(right, context) 

943 dunder = None 

944 dunders = _find_dunder(op, COMP_OP_DUNDERS) 

945 if not dunders: 

946 if isinstance(op, ast.NotIn): 

947 dunders = COMP_OP_DUNDERS[ast.In] 

948 negate = True 

949 if isinstance(op, ast.Is): 

950 dunder = "is_" 

951 if isinstance(op, ast.IsNot): 

952 dunder = "is_" 

953 negate = True 

954 if not dunder and dunders: 

955 dunder = dunders[0] 

956 if dunder: 

957 a, b = (right, left) if dunder == "__contains__" else (left, right) 

958 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): 

959 result = getattr(operator, dunder)(a, b) 

960 if negate: 

961 result = not result 

962 if not result: 

963 all_true = False 

964 left = right 

965 else: 

966 raise GuardRejection( 

967 f"Comparison (`{dunder}`) for", 

968 type(left), 

969 f"not allowed in {context.evaluation} mode", 

970 ) 

971 else: 

972 raise ValueError( 

973 f"Comparison `{dunder}` not supported" 

974 ) # pragma: no cover 

975 return all_true 

976 if isinstance(node, ast.Constant): 

977 return node.value 

978 if isinstance(node, ast.Tuple): 

979 return tuple(eval_node(e, context) for e in node.elts) 

980 if isinstance(node, ast.List): 

981 return [eval_node(e, context) for e in node.elts] 

982 if isinstance(node, ast.Set): 

983 return {eval_node(e, context) for e in node.elts} 

984 if isinstance(node, ast.Dict): 

985 return dict( 

986 zip( 

987 [eval_node(k, context) for k in node.keys], 

988 [eval_node(v, context) for v in node.values], 

989 ) 

990 ) 

991 if isinstance(node, ast.Slice): 

992 return slice( 

993 eval_node(node.lower, context), 

994 eval_node(node.upper, context), 

995 eval_node(node.step, context), 

996 ) 

997 if isinstance(node, ast.UnaryOp): 

998 value = eval_node(node.operand, context) 

999 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) 

1000 if dunders: 

1001 if policy.can_operate(dunders, value): 

1002 try: 

1003 return getattr(value, dunders[0])() 

1004 except AttributeError: 

1005 raise TypeError( 

1006 f"bad operand type for unary {node.op}: {type(value)}" 

1007 ) 

1008 else: 

1009 raise GuardRejection( 

1010 f"Operation (`{dunders}`) for", 

1011 type(value), 

1012 f"not allowed in {context.evaluation} mode", 

1013 ) 

1014 if isinstance(node, ast.Subscript): 

1015 value = eval_node(node.value, context) 

1016 slice_ = eval_node(node.slice, context) 

1017 if policy.can_get_item(value, slice_): 

1018 return value[slice_] 

1019 raise GuardRejection( 

1020 "Subscript access (`__getitem__`) for", 

1021 type(value), # not joined to avoid calling `repr` 

1022 f" not allowed in {context.evaluation} mode", 

1023 ) 

1024 if isinstance(node, ast.Name): 

1025 return _eval_node_name(node.id, context) 

1026 if isinstance(node, ast.Attribute): 

1027 if ( 

1028 context.class_transients is not None 

1029 and isinstance(node.value, ast.Name) 

1030 and node.value.id == context.instance_arg_name 

1031 ): 

1032 return context.class_transients.get(node.attr) 

1033 value = eval_node(node.value, context) 

1034 if policy.can_get_attr(value, node.attr): 

1035 return getattr(value, node.attr) 

1036 raise GuardRejection( 

1037 "Attribute access (`__getattr__`) for", 

1038 type(value), # not joined to avoid calling `repr` 

1039 f"not allowed in {context.evaluation} mode", 

1040 ) 

1041 if isinstance(node, ast.IfExp): 

1042 test = eval_node(node.test, context) 

1043 if test: 

1044 return eval_node(node.body, context) 

1045 else: 

1046 return eval_node(node.orelse, context) 

1047 if isinstance(node, ast.Call): 

1048 func = eval_node(node.func, context) 

1049 if policy.can_call(func): 

1050 args, kwargs = _extract_args_and_kwargs(node, context) 

1051 return func(*args, **kwargs) 

1052 if isclass(func): 

1053 # this code path gets entered when calling class e.g. `MyClass()` 

1054 # or `my_instance.__class__()` - in both cases `func` is `MyClass`. 

1055 # Should return `MyClass` if `__new__` is not overridden, 

1056 # otherwise whatever `__new__` return type is. 

1057 overridden_return_type = _eval_return_type(func.__new__, node, context) 

1058 if overridden_return_type is not NOT_EVALUATED: 

1059 return overridden_return_type 

1060 return _create_duck_for_heap_type(func) 

1061 else: 

1062 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED) 

1063 return_type = _eval_return_type(func, node, context) 

1064 if getattr(func, "__is_async__", False): 

1065 awaited_type = ( 

1066 inferred_return if inferred_return is not None else return_type 

1067 ) 

1068 coroutine_duck = _Duck(attributes=_get_coroutine_attributes()) 

1069 coroutine_duck.__awaited_type__ = awaited_type 

1070 return coroutine_duck 

1071 if inferred_return is not NOT_EVALUATED: 

1072 return inferred_return 

1073 if return_type is not NOT_EVALUATED: 

1074 return return_type 

1075 raise GuardRejection( 

1076 "Call for", 

1077 func, # not joined to avoid calling `repr` 

1078 f"not allowed in {context.evaluation} mode", 

1079 ) 

1080 if isinstance(node, ast.Assert): 

1081 # message is always the second item, so if it is defined user would be completing 

1082 # on the message, not on the assertion test 

1083 if node.msg: 

1084 return eval_node(node.msg, context) 

1085 return eval_node(node.test, context) 

1086 return None 

1087 

1088 

1089def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy): 

1090 """Merge multiple dictionaries, combining values for each key.""" 

1091 if len(dicts) == 1: 

1092 return dicts[0] 

1093 

1094 all_keys = set() 

1095 for d in dicts: 

1096 all_keys.update(d.keys()) 

1097 

1098 merged = {} 

1099 for key in all_keys: 

1100 values = [d[key] for d in dicts if key in d] 

1101 if values: 

1102 merged[key] = _merge_values(values, policy) 

1103 

1104 return merged 

1105 

1106 

1107def _merge_values(values, policy: EvaluationPolicy): 

1108 """Recursively merge multiple values, combining attributes and dict items.""" 

1109 if len(values) == 1: 

1110 return values[0] 

1111 

1112 types = {type(v) for v in values} 

1113 merged_items = None 

1114 key_values = {} 

1115 attributes = set() 

1116 for v in values: 

1117 if policy.can_call(v.__dir__): 

1118 attributes.update(dir(v)) 

1119 try: 

1120 if policy.can_call(v.items): 

1121 try: 

1122 for k, val in v.items(): 

1123 key_values.setdefault(k, []).append(val) 

1124 except Exception as e: 

1125 pass 

1126 elif policy.can_call(v.keys): 

1127 try: 

1128 for k in v.keys(): 

1129 key_values.setdefault(k, []).append(None) 

1130 except Exception as e: 

1131 pass 

1132 except Exception as e: 

1133 pass 

1134 

1135 if key_values: 

1136 merged_items = { 

1137 k: _merge_values(vals, policy) if vals[0] is not None else None 

1138 for k, vals in key_values.items() 

1139 } 

1140 

1141 if len(types) == 1: 

1142 t = next(iter(types)) 

1143 if t not in (dict,) and not ( 

1144 hasattr(next(iter(values)), "__getitem__") 

1145 and ( 

1146 hasattr(next(iter(values)), "items") 

1147 or hasattr(next(iter(values)), "keys") 

1148 ) 

1149 ): 

1150 if t in (list, set, tuple): 

1151 return t 

1152 return values[0] 

1153 

1154 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items) 

1155 

1156 

1157def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext): 

1158 """Infer the return value(s) of a function by evaluating all return statements.""" 

1159 return_values = _collect_return_values(node.body, context) 

1160 

1161 if not return_values: 

1162 return None 

1163 if len(return_values) == 1: 

1164 return return_values[0] 

1165 

1166 policy = get_policy(context) 

1167 return _merge_values(return_values, policy) 

1168 

1169 

1170def _collect_return_values(body, context): 

1171 """Recursively collect return values from a list of AST statements.""" 

1172 return_values = [] 

1173 for stmt in body: 

1174 if isinstance(stmt, ast.Return): 

1175 if stmt.value is None: 

1176 continue 

1177 try: 

1178 value = eval_node(stmt.value, context) 

1179 if value is not None and value is not NOT_EVALUATED: 

1180 return_values.append(value) 

1181 except Exception: 

1182 pass 

1183 if isinstance( 

1184 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda) 

1185 ): 

1186 continue 

1187 elif hasattr(stmt, "body") and isinstance(stmt.body, list): 

1188 return_values.extend(_collect_return_values(stmt.body, context)) 

1189 if isinstance(stmt, ast.Try): 

1190 for h in stmt.handlers: 

1191 if hasattr(h, "body"): 

1192 return_values.extend(_collect_return_values(h.body, context)) 

1193 if hasattr(stmt, "orelse"): 

1194 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1195 if hasattr(stmt, "finalbody"): 

1196 return_values.extend(_collect_return_values(stmt.finalbody, context)) 

1197 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list): 

1198 return_values.extend(_collect_return_values(stmt.orelse, context)) 

1199 return return_values 

1200 

1201 

1202def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): 

1203 """Evaluate return type of a given callable function. 

1204 

1205 Returns the built-in type, a duck or NOT_EVALUATED sentinel. 

1206 """ 

1207 try: 

1208 sig = signature(func) 

1209 except ValueError: 

1210 sig = UNKNOWN_SIGNATURE 

1211 # if annotation was not stringized, or it was stringized 

1212 # but resolved by signature call we know the return type 

1213 not_empty = sig.return_annotation is not Signature.empty 

1214 if not_empty: 

1215 return _resolve_annotation(sig.return_annotation, context, sig, func, node) 

1216 return NOT_EVALUATED 

1217 

1218 

1219def _eval_annotation( 

1220 annotation: str, 

1221 context: EvaluationContext, 

1222): 

1223 return ( 

1224 _eval_node_name(annotation, context) 

1225 if isinstance(annotation, str) 

1226 else annotation 

1227 ) 

1228 

1229 

1230class _GetItemDuck(dict): 

1231 """A dict subclass that always returns the factory instance and claims to have any item.""" 

1232 

1233 def __init__(self, factory, *args, **kwargs): 

1234 super().__init__(*args, **kwargs) 

1235 self._factory = factory 

1236 

1237 def __getitem__(self, key): 

1238 return self._factory() 

1239 

1240 def __contains__(self, key): 

1241 return True 

1242 

1243 

1244def _resolve_annotation( 

1245 annotation: object | str, 

1246 context: EvaluationContext, 

1247 sig: Signature | None = None, 

1248 func: Callable | None = None, 

1249 node: ast.Call | None = None, 

1250): 

1251 """Resolve annotation created by user with `typing` module and custom objects.""" 

1252 if annotation is None: 

1253 return None 

1254 annotation = _eval_annotation(annotation, context) 

1255 origin = get_origin(annotation) 

1256 if annotation is Self and func and hasattr(func, "__self__"): 

1257 return func.__self__ 

1258 elif origin is Literal: 

1259 type_args = get_args(annotation) 

1260 if len(type_args) == 1: 

1261 return type_args[0] 

1262 elif annotation is LiteralString: 

1263 return "" 

1264 elif annotation is AnyStr: 

1265 index = None 

1266 if func and hasattr(func, "__node__"): 

1267 def_node = func.__node__ 

1268 for i, arg in enumerate(def_node.args.args): 

1269 if not arg.annotation: 

1270 continue 

1271 annotation = _eval_annotation(arg.annotation.id, context) 

1272 if annotation is AnyStr: 

1273 index = i 

1274 break 

1275 is_bound_method = ( 

1276 isinstance(func, MethodType) and getattr(func, "__self__") is not None 

1277 ) 

1278 if index and is_bound_method: 

1279 index -= 1 

1280 elif sig: 

1281 for i, (key, value) in enumerate(sig.parameters.items()): 

1282 if value.annotation is AnyStr: 

1283 index = i 

1284 break 

1285 if index is None: 

1286 return None 

1287 if index < 0 or index >= len(node.args): 

1288 return None 

1289 return eval_node(node.args[index], context) 

1290 elif origin is TypeGuard: 

1291 return False 

1292 elif origin is set or origin is list: 

1293 # only one type argument allowed 

1294 attributes = [ 

1295 attr 

1296 for attr in dir( 

1297 _resolve_annotation(get_args(annotation)[0], context, sig, func, node) 

1298 ) 

1299 ] 

1300 duck = _Duck(attributes=dict.fromkeys(attributes)) 

1301 return _Duck( 

1302 attributes=dict.fromkeys(dir(origin())), 

1303 # items are not strrictly needed for set 

1304 items=_GetItemDuck(lambda: duck), 

1305 ) 

1306 elif origin is tuple: 

1307 # multiple type arguments 

1308 return tuple( 

1309 _resolve_annotation(arg, context, sig, func, node) 

1310 for arg in get_args(annotation) 

1311 ) 

1312 elif origin is Union: 

1313 # multiple type arguments 

1314 attributes = [ 

1315 attr 

1316 for type_arg in get_args(annotation) 

1317 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node)) 

1318 ] 

1319 return _Duck(attributes=dict.fromkeys(attributes)) 

1320 elif is_typeddict(annotation): 

1321 return _Duck( 

1322 attributes=dict.fromkeys(dir(dict())), 

1323 items={ 

1324 k: _resolve_annotation(v, context, sig, func, node) 

1325 for k, v in annotation.__annotations__.items() 

1326 }, 

1327 ) 

1328 elif hasattr(annotation, "_is_protocol"): 

1329 return _Duck(attributes=dict.fromkeys(dir(annotation))) 

1330 elif origin is Annotated: 

1331 type_arg = get_args(annotation)[0] 

1332 return _resolve_annotation(type_arg, context, sig, func, node) 

1333 elif isinstance(annotation, NewType): 

1334 return _eval_or_create_duck(annotation.__supertype__, context) 

1335 elif isinstance(annotation, TypeAliasType): 

1336 return _eval_or_create_duck(annotation.__value__, context) 

1337 else: 

1338 return _eval_or_create_duck(annotation, context) 

1339 

1340 

1341def _eval_node_name(node_id: str, context: EvaluationContext): 

1342 policy = get_policy(context) 

1343 if node_id in context.transient_locals: 

1344 return context.transient_locals[node_id] 

1345 if policy.allow_locals_access and node_id in context.locals: 

1346 return context.locals[node_id] 

1347 if policy.allow_globals_access and node_id in context.globals: 

1348 return context.globals[node_id] 

1349 if policy.allow_builtins_access and hasattr(builtins, node_id): 

1350 # note: do not use __builtins__, it is implementation detail of cPython 

1351 return getattr(builtins, node_id) 

1352 if policy.allow_auto_import and context.auto_import: 

1353 return context.auto_import(node_id) 

1354 if not policy.allow_globals_access and not policy.allow_locals_access: 

1355 raise GuardRejection( 

1356 f"Namespace access not allowed in {context.evaluation} mode" 

1357 ) 

1358 else: 

1359 raise NameError(f"{node_id} not found in locals, globals, nor builtins") 

1360 

1361 

1362def _eval_or_create_duck(duck_type, context: EvaluationContext): 

1363 policy = get_policy(context) 

1364 # if allow-listed builtin is on type annotation, instantiate it 

1365 if policy.can_call(duck_type): 

1366 return duck_type() 

1367 # if custom class is in type annotation, mock it 

1368 return _create_duck_for_heap_type(duck_type) 

1369 

1370 

1371def _create_duck_for_heap_type(duck_type): 

1372 """Create an imitation of an object of a given type (a duck). 

1373 

1374 Returns the duck or NOT_EVALUATED sentinel if duck could not be created. 

1375 """ 

1376 duck = ImpersonatingDuck() 

1377 try: 

1378 # this only works for heap types, not builtins 

1379 duck.__class__ = duck_type 

1380 return duck 

1381 except TypeError: 

1382 pass 

1383 return NOT_EVALUATED 

1384 

1385 

1386SUPPORTED_EXTERNAL_GETITEM = { 

1387 ("pandas", "core", "indexing", "_iLocIndexer"), 

1388 ("pandas", "core", "indexing", "_LocIndexer"), 

1389 ("pandas", "DataFrame"), 

1390 ("pandas", "Series"), 

1391 ("numpy", "ndarray"), 

1392 ("numpy", "void"), 

1393} 

1394 

1395 

1396BUILTIN_GETITEM: set[InstancesHaveGetItem] = { 

1397 dict, 

1398 str, # type: ignore[arg-type] 

1399 bytes, # type: ignore[arg-type] 

1400 list, 

1401 tuple, 

1402 type, # for type annotations like list[str] 

1403 _Duck, 

1404 collections.defaultdict, 

1405 collections.deque, 

1406 collections.OrderedDict, 

1407 collections.ChainMap, 

1408 collections.UserDict, 

1409 collections.UserList, 

1410 collections.UserString, # type: ignore[arg-type] 

1411 _DummyNamedTuple, 

1412 _IdentitySubscript, 

1413} 

1414 

1415 

1416def _list_methods(cls, source=None): 

1417 """For use on immutable objects or with methods returning a copy""" 

1418 return [getattr(cls, k) for k in (source if source else dir(cls))] 

1419 

1420 

1421dict_non_mutating_methods = ("copy", "keys", "values", "items") 

1422list_non_mutating_methods = ("copy", "index", "count") 

1423set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) 

1424 

1425 

1426dict_keys: type[collections.abc.KeysView] = type({}.keys()) 

1427dict_values: type = type({}.values()) 

1428dict_items: type = type({}.items()) 

1429 

1430NUMERICS = {int, float, complex} 

1431 

1432ALLOWED_CALLS = { 

1433 bytes, 

1434 *_list_methods(bytes), 

1435 bytes.__iter__, 

1436 dict, 

1437 *_list_methods(dict, dict_non_mutating_methods), 

1438 dict.__iter__, 

1439 dict_keys.__iter__, 

1440 dict_values.__iter__, 

1441 dict_items.__iter__, 

1442 dict_keys.isdisjoint, 

1443 list, 

1444 *_list_methods(list, list_non_mutating_methods), 

1445 list.__iter__, 

1446 set, 

1447 *_list_methods(set, set_non_mutating_methods), 

1448 set.__iter__, 

1449 frozenset, 

1450 *_list_methods(frozenset), 

1451 frozenset.__iter__, 

1452 range, 

1453 range.__iter__, 

1454 str, 

1455 *_list_methods(str), 

1456 str.__iter__, 

1457 tuple, 

1458 *_list_methods(tuple), 

1459 tuple.__iter__, 

1460 bool, 

1461 *_list_methods(bool), 

1462 *NUMERICS, 

1463 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], 

1464 collections.deque, 

1465 *_list_methods(collections.deque, list_non_mutating_methods), 

1466 collections.deque.__iter__, 

1467 collections.defaultdict, 

1468 *_list_methods(collections.defaultdict, dict_non_mutating_methods), 

1469 collections.defaultdict.__iter__, 

1470 collections.OrderedDict, 

1471 *_list_methods(collections.OrderedDict, dict_non_mutating_methods), 

1472 collections.OrderedDict.__iter__, 

1473 collections.UserDict, 

1474 *_list_methods(collections.UserDict, dict_non_mutating_methods), 

1475 collections.UserDict.__iter__, 

1476 collections.UserList, 

1477 *_list_methods(collections.UserList, list_non_mutating_methods), 

1478 collections.UserList.__iter__, 

1479 collections.UserString, 

1480 *_list_methods(collections.UserString, dir(str)), 

1481 collections.UserString.__iter__, 

1482 collections.Counter, 

1483 *_list_methods(collections.Counter, dict_non_mutating_methods), 

1484 collections.Counter.__iter__, 

1485 collections.Counter.elements, 

1486 collections.Counter.most_common, 

1487 object.__dir__, 

1488 type.__dir__, 

1489 _Duck.__dir__, 

1490} 

1491 

1492BUILTIN_GETATTR: set[MayHaveGetattr] = { 

1493 *BUILTIN_GETITEM, 

1494 set, 

1495 frozenset, 

1496 object, 

1497 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. 

1498 *NUMERICS, 

1499 dict_keys, 

1500 MethodDescriptorType, 

1501 ModuleType, 

1502} 

1503 

1504 

1505BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} 

1506 

1507EVALUATION_POLICIES = { 

1508 "minimal": EvaluationPolicy( 

1509 allow_builtins_access=True, 

1510 allow_locals_access=False, 

1511 allow_globals_access=False, 

1512 allow_item_access=False, 

1513 allow_attr_access=False, 

1514 allowed_calls=set(), 

1515 allow_any_calls=False, 

1516 allow_all_operations=False, 

1517 ), 

1518 "limited": SelectivePolicy( 

1519 allowed_getitem=BUILTIN_GETITEM, 

1520 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, 

1521 allowed_getattr=BUILTIN_GETATTR, 

1522 allowed_getattr_external={ 

1523 # pandas Series/Frame implements custom `__getattr__` 

1524 ("pandas", "DataFrame"), 

1525 ("pandas", "Series"), 

1526 }, 

1527 allowed_operations=BUILTIN_OPERATIONS, 

1528 allow_builtins_access=True, 

1529 allow_locals_access=True, 

1530 allow_globals_access=True, 

1531 allow_getitem_on_types=True, 

1532 allowed_calls=ALLOWED_CALLS, 

1533 ), 

1534 "unsafe": EvaluationPolicy( 

1535 allow_builtins_access=True, 

1536 allow_locals_access=True, 

1537 allow_globals_access=True, 

1538 allow_attr_access=True, 

1539 allow_item_access=True, 

1540 allow_any_calls=True, 

1541 allow_all_operations=True, 

1542 ), 

1543} 

1544 

1545 

1546__all__ = [ 

1547 "guarded_eval", 

1548 "eval_node", 

1549 "GuardRejection", 

1550 "EvaluationContext", 

1551 "_unbind_method", 

1552]