1from copy import copy
2from inspect import isclass, signature, Signature, getmodule
3from typing import (
4 Annotated,
5 AnyStr,
6 Callable,
7 Literal,
8 NamedTuple,
9 NewType,
10 Optional,
11 Protocol,
12 Sequence,
13 TypeGuard,
14 Union,
15 get_args,
16 get_origin,
17 is_typeddict,
18)
19import ast
20import builtins
21import collections
22import dataclasses
23import operator
24import sys
25import typing
26import warnings
27from functools import cached_property
28from dataclasses import dataclass, field
29from types import MethodDescriptorType, ModuleType, MethodType
30
31from IPython.utils.decorators import undoc
32
33
34from typing import Self, LiteralString, get_type_hints
35
36if sys.version_info < (3, 12):
37 from typing_extensions import TypeAliasType
38else:
39 from typing import TypeAliasType
40
41
42@undoc
43class HasGetItem(Protocol):
44 def __getitem__(self, key) -> None:
45 ...
46
47
48@undoc
49class InstancesHaveGetItem(Protocol):
50 def __call__(self, *args, **kwargs) -> HasGetItem:
51 ...
52
53
54@undoc
55class HasGetAttr(Protocol):
56 def __getattr__(self, key) -> None:
57 ...
58
59
60@undoc
61class DoesNotHaveGetAttr(Protocol):
62 pass
63
64
65# By default `__getattr__` is not explicitly implemented on most objects
66MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr]
67
68
69def _unbind_method(func: Callable) -> Union[Callable, None]:
70 """Get unbound method for given bound method.
71
72 Returns None if cannot get unbound method, or method is already unbound.
73 """
74 owner = getattr(func, "__self__", None)
75 owner_class = type(owner)
76 name = getattr(func, "__name__", None)
77 instance_dict_overrides = getattr(owner, "__dict__", None)
78 if (
79 owner is not None
80 and name
81 and (
82 not instance_dict_overrides
83 or (instance_dict_overrides and name not in instance_dict_overrides)
84 )
85 ):
86 return getattr(owner_class, name)
87 return None
88
89
90@undoc
91@dataclass
92class EvaluationPolicy:
93 """Definition of evaluation policy."""
94
95 allow_locals_access: bool = False
96 allow_globals_access: bool = False
97 allow_item_access: bool = False
98 allow_attr_access: bool = False
99 allow_builtins_access: bool = False
100 allow_all_operations: bool = False
101 allow_any_calls: bool = False
102 allow_auto_import: bool = False
103 allowed_calls: set[Callable] = field(default_factory=set)
104
105 def can_get_item(self, value, item):
106 return self.allow_item_access
107
108 def can_get_attr(self, value, attr):
109 return self.allow_attr_access
110
111 def can_operate(self, dunders: tuple[str, ...], a, b=None):
112 if self.allow_all_operations:
113 return True
114
115 def can_call(self, func):
116 if self.allow_any_calls:
117 return True
118
119 if func in self.allowed_calls:
120 return True
121
122 owner_method = _unbind_method(func)
123
124 if owner_method and owner_method in self.allowed_calls:
125 return True
126
127
128def _get_external(module_name: str, access_path: Sequence[str]):
129 """Get value from external module given a dotted access path.
130
131 Only gets value if the module is already imported.
132
133 Raises:
134 * `KeyError` if module is removed not found, and
135 * `AttributeError` if access path does not match an exported object
136 """
137 try:
138 member_type = sys.modules[module_name]
139 # standard module
140 for attr in access_path:
141 member_type = getattr(member_type, attr)
142 return member_type
143 except (KeyError, AttributeError):
144 # handle modules in namespace packages
145 module_path = ".".join([module_name, *access_path])
146 if module_path in sys.modules:
147 return sys.modules[module_path]
148 raise
149
150
151def _has_original_dunder_external(
152 value,
153 module_name: str,
154 access_path: Sequence[str],
155 method_name: str,
156):
157 if module_name not in sys.modules:
158 full_module_path = ".".join([module_name, *access_path])
159 if full_module_path not in sys.modules:
160 # LBYLB as it is faster
161 return False
162 try:
163 member_type = _get_external(module_name, access_path)
164 value_type = type(value)
165 if type(value) == member_type:
166 return True
167 if isinstance(member_type, ModuleType):
168 value_module = getmodule(value_type)
169 if not value_module or not value_module.__name__:
170 return False
171 if (
172 value_module.__name__ == member_type.__name__
173 or value_module.__name__.startswith(member_type.__name__ + ".")
174 ):
175 return True
176 if method_name == "__getattribute__":
177 # we have to short-circuit here due to an unresolved issue in
178 # `isinstance` implementation: https://bugs.python.org/issue32683
179 return False
180 if not isinstance(member_type, ModuleType) and isinstance(value, member_type):
181 method = getattr(value_type, method_name, None)
182 member_method = getattr(member_type, method_name, None)
183 if member_method == method:
184 return True
185 if isinstance(member_type, ModuleType):
186 method = getattr(value_type, method_name, None)
187 for base_class in value_type.__mro__[1:]:
188 base_module = getmodule(base_class)
189 if base_module and (
190 base_module.__name__ == member_type.__name__
191 or base_module.__name__.startswith(member_type.__name__ + ".")
192 ):
193 # Check if the method comes from this trusted base class
194 base_method = getattr(base_class, method_name, None)
195 if base_method is not None and base_method == method:
196 return True
197 except (AttributeError, KeyError):
198 return False
199
200
201def _has_original_dunder(
202 value, allowed_types, allowed_methods, allowed_external, method_name
203):
204 # note: Python ignores `__getattr__`/`__getitem__` on instances,
205 # we only need to check at class level
206 value_type = type(value)
207
208 # strict type check passes → no need to check method
209 if value_type in allowed_types:
210 return True
211
212 method = getattr(value_type, method_name, None)
213
214 if method is None:
215 return None
216
217 if method in allowed_methods:
218 return True
219
220 for module_name, *access_path in allowed_external:
221 if _has_original_dunder_external(value, module_name, access_path, method_name):
222 return True
223
224 return False
225
226
227def _coerce_path_to_tuples(
228 allow_list: set[tuple[str, ...] | str],
229) -> set[tuple[str, ...]]:
230 """Replace dotted paths on the provided allow-list with tuples."""
231 return {
232 path if isinstance(path, tuple) else tuple(path.split("."))
233 for path in allow_list
234 }
235
236
237@undoc
238@dataclass
239class SelectivePolicy(EvaluationPolicy):
240 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set)
241 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set)
242
243 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set)
244 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set)
245
246 allowed_operations: set = field(default_factory=set)
247 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set)
248
249 allow_getitem_on_types: bool = field(default_factory=bool)
250
251 _operation_methods_cache: dict[str, set[Callable]] = field(
252 default_factory=dict, init=False
253 )
254
255 def can_get_attr(self, value, attr):
256 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external)
257
258 has_original_attribute = _has_original_dunder(
259 value,
260 allowed_types=self.allowed_getattr,
261 allowed_methods=self._getattribute_methods,
262 allowed_external=allowed_getattr_external,
263 method_name="__getattribute__",
264 )
265 has_original_attr = _has_original_dunder(
266 value,
267 allowed_types=self.allowed_getattr,
268 allowed_methods=self._getattr_methods,
269 allowed_external=allowed_getattr_external,
270 method_name="__getattr__",
271 )
272
273 accept = False
274
275 # Many objects do not have `__getattr__`, this is fine.
276 if has_original_attr is None and has_original_attribute:
277 accept = True
278 else:
279 # Accept objects without modifications to `__getattr__` and `__getattribute__`
280 accept = has_original_attr and has_original_attribute
281
282 if accept:
283 # We still need to check for overridden properties.
284
285 value_class = type(value)
286 if not hasattr(value_class, attr):
287 return True
288
289 class_attr_val = getattr(value_class, attr)
290 is_property = isinstance(class_attr_val, property)
291
292 if not is_property:
293 return True
294
295 # Properties in allowed types are ok (although we do not include any
296 # properties in our default allow list currently).
297 if type(value) in self.allowed_getattr:
298 return True # pragma: no cover
299
300 # Properties in subclasses of allowed types may be ok if not changed
301 for module_name, *access_path in allowed_getattr_external:
302 try:
303 external_class = _get_external(module_name, access_path)
304 external_class_attr_val = getattr(external_class, attr)
305 except (KeyError, AttributeError):
306 return False # pragma: no cover
307 return class_attr_val == external_class_attr_val
308
309 return False
310
311 def can_get_item(self, value, item):
312 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified."""
313 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external)
314 if self.allow_getitem_on_types:
315 # e.g. Union[str, int] or Literal[True, 1]
316 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)):
317 return True
318 # PEP 560 e.g. list[str]
319 if isinstance(value, type) and hasattr(value, "__class_getitem__"):
320 return True
321 return _has_original_dunder(
322 value,
323 allowed_types=self.allowed_getitem,
324 allowed_methods=self._getitem_methods,
325 allowed_external=allowed_getitem_external,
326 method_name="__getitem__",
327 )
328
329 def can_operate(self, dunders: tuple[str, ...], a, b=None):
330 allowed_operations_external = _coerce_path_to_tuples(
331 self.allowed_operations_external
332 )
333 objects = [a]
334 if b is not None:
335 objects.append(b)
336 return all(
337 [
338 _has_original_dunder(
339 obj,
340 allowed_types=self.allowed_operations,
341 allowed_methods=self._operator_dunder_methods(dunder),
342 allowed_external=allowed_operations_external,
343 method_name=dunder,
344 )
345 for dunder in dunders
346 for obj in objects
347 ]
348 )
349
350 def _operator_dunder_methods(self, dunder: str) -> set[Callable]:
351 if dunder not in self._operation_methods_cache:
352 self._operation_methods_cache[dunder] = self._safe_get_methods(
353 self.allowed_operations, dunder
354 )
355 return self._operation_methods_cache[dunder]
356
357 @cached_property
358 def _getitem_methods(self) -> set[Callable]:
359 return self._safe_get_methods(self.allowed_getitem, "__getitem__")
360
361 @cached_property
362 def _getattr_methods(self) -> set[Callable]:
363 return self._safe_get_methods(self.allowed_getattr, "__getattr__")
364
365 @cached_property
366 def _getattribute_methods(self) -> set[Callable]:
367 return self._safe_get_methods(self.allowed_getattr, "__getattribute__")
368
369 def _safe_get_methods(self, classes, name) -> set[Callable]:
370 return {
371 method
372 for class_ in classes
373 for method in [getattr(class_, name, None)]
374 if method
375 }
376
377
378class _DummyNamedTuple(NamedTuple):
379 """Used internally to retrieve methods of named tuple instance."""
380
381
382EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"]
383
384
385@dataclass
386class EvaluationContext:
387 #: Local namespace
388 locals: dict
389 #: Global namespace
390 globals: dict
391 #: Evaluation policy identifier
392 evaluation: EvaluationPolicyName = "forbidden"
393 #: Whether the evaluation of code takes place inside of a subscript.
394 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``.
395 in_subscript: bool = False
396 #: Auto import method
397 auto_import: Callable[list[str], ModuleType] | None = None
398 #: Overrides for evaluation policy
399 policy_overrides: dict = field(default_factory=dict)
400 #: Transient local namespace used to store mocks
401 transient_locals: dict = field(default_factory=dict)
402 #: Transients of class level
403 class_transients: dict | None = None
404 #: Instance variable name used in the method definition
405 instance_arg_name: str | None = None
406
407 def replace(self, /, **changes):
408 """Return a new copy of the context, with specified changes"""
409 return dataclasses.replace(self, **changes)
410
411
412class _IdentitySubscript:
413 """Returns the key itself when item is requested via subscript."""
414
415 def __getitem__(self, key):
416 return key
417
418
419IDENTITY_SUBSCRIPT = _IdentitySubscript()
420SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__"
421UNKNOWN_SIGNATURE = Signature()
422NOT_EVALUATED = object()
423
424
425class GuardRejection(Exception):
426 """Exception raised when guard rejects evaluation attempt."""
427
428 pass
429
430
431def guarded_eval(code: str, context: EvaluationContext):
432 """Evaluate provided code in the evaluation context.
433
434 If evaluation policy given by context is set to ``forbidden``
435 no evaluation will be performed; if it is set to ``dangerous``
436 standard :func:`eval` will be used; finally, for any other,
437 policy :func:`eval_node` will be called on parsed AST.
438 """
439 locals_ = context.locals
440
441 if context.evaluation == "forbidden":
442 raise GuardRejection("Forbidden mode")
443
444 # note: not using `ast.literal_eval` as it does not implement
445 # getitem at all, for example it fails on simple `[0][1]`
446
447 if context.in_subscript:
448 # syntactic sugar for ellipsis (:) is only available in subscripts
449 # so we need to trick the ast parser into thinking that we have
450 # a subscript, but we need to be able to later recognise that we did
451 # it so we can ignore the actual __getitem__ operation
452 if not code:
453 return tuple()
454 locals_ = locals_.copy()
455 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT
456 code = SUBSCRIPT_MARKER + "[" + code + "]"
457 context = context.replace(locals=locals_)
458
459 if context.evaluation == "dangerous":
460 return eval(code, context.globals, context.locals)
461
462 node = ast.parse(code, mode="exec")
463
464 return eval_node(node, context)
465
466
467BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = {
468 ast.Add: ("__add__",),
469 ast.Sub: ("__sub__",),
470 ast.Mult: ("__mul__",),
471 ast.Div: ("__truediv__",),
472 ast.FloorDiv: ("__floordiv__",),
473 ast.Mod: ("__mod__",),
474 ast.Pow: ("__pow__",),
475 ast.LShift: ("__lshift__",),
476 ast.RShift: ("__rshift__",),
477 ast.BitOr: ("__or__",),
478 ast.BitXor: ("__xor__",),
479 ast.BitAnd: ("__and__",),
480 ast.MatMult: ("__matmul__",),
481}
482
483COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = {
484 ast.Eq: ("__eq__",),
485 ast.NotEq: ("__ne__", "__eq__"),
486 ast.Lt: ("__lt__", "__gt__"),
487 ast.LtE: ("__le__", "__ge__"),
488 ast.Gt: ("__gt__", "__lt__"),
489 ast.GtE: ("__ge__", "__le__"),
490 ast.In: ("__contains__",),
491 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially
492}
493
494UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = {
495 ast.USub: ("__neg__",),
496 ast.UAdd: ("__pos__",),
497 # we have to check both __inv__ and __invert__!
498 ast.Invert: ("__invert__", "__inv__"),
499 ast.Not: ("__not__",),
500}
501
502
503class ImpersonatingDuck:
504 """A dummy class used to create objects of other classes without calling their ``__init__``"""
505
506 # no-op: override __class__ to impersonate
507
508
509class _Duck:
510 """A dummy class used to create objects pretending to have given attributes"""
511
512 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None):
513 self.attributes = attributes if attributes is not None else {}
514 self.items = items if items is not None else {}
515
516 def __getattr__(self, attr: str):
517 return self.attributes[attr]
518
519 def __hasattr__(self, attr: str):
520 return attr in self.attributes
521
522 def __dir__(self):
523 return [*dir(super), *self.attributes]
524
525 def __getitem__(self, key: str):
526 return self.items[key]
527
528 def __hasitem__(self, key: str):
529 return self.items[key]
530
531 def _ipython_key_completions_(self):
532 return self.items.keys()
533
534
535def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]:
536 dunder = None
537 for op, candidate_dunder in dunders.items():
538 if isinstance(node_op, op):
539 dunder = candidate_dunder
540 return dunder
541
542
543def get_policy(context: EvaluationContext) -> EvaluationPolicy:
544 policy = copy(EVALUATION_POLICIES[context.evaluation])
545
546 for key, value in context.policy_overrides.items():
547 if hasattr(policy, key):
548 setattr(policy, key, value)
549 return policy
550
551
552def _validate_policy_overrides(
553 policy_name: EvaluationPolicyName, policy_overrides: dict
554) -> bool:
555 policy = EVALUATION_POLICIES[policy_name]
556
557 all_good = True
558 for key, value in policy_overrides.items():
559 if not hasattr(policy, key):
560 warnings.warn(
561 f"Override {key!r} is not valid with {policy_name!r} evaluation policy"
562 )
563 all_good = False
564 return all_good
565
566
567def _handle_assign(node: ast.Assign, context: EvaluationContext):
568 value = eval_node(node.value, context)
569 transient_locals = context.transient_locals
570 policy = get_policy(context)
571 class_transients = context.class_transients
572 for target in node.targets:
573 if isinstance(target, (ast.Tuple, ast.List)):
574 # Handle unpacking assignment
575 values = list(value)
576 targets = target.elts
577 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)]
578
579 # Unified handling: treat no starred as starred at end
580 star_or_last_idx = starred[0] if starred else len(targets)
581
582 # Before starred
583 for i in range(star_or_last_idx):
584 # Check for self.x assignment
585 if _is_instance_attribute_assignment(targets[i], context):
586 class_transients[targets[i].attr] = values[i]
587 else:
588 transient_locals[targets[i].id] = values[i]
589
590 # Starred if exists
591 if starred:
592 end = len(values) - (len(targets) - star_or_last_idx - 1)
593 if _is_instance_attribute_assignment(
594 targets[star_or_last_idx], context
595 ):
596 class_transients[targets[star_or_last_idx].attr] = values[
597 star_or_last_idx:end
598 ]
599 else:
600 transient_locals[targets[star_or_last_idx].value.id] = values[
601 star_or_last_idx:end
602 ]
603
604 # After starred
605 for i in range(star_or_last_idx + 1, len(targets)):
606 if _is_instance_attribute_assignment(targets[i], context):
607 class_transients[targets[i].attr] = values[
608 len(values) - (len(targets) - i)
609 ]
610 else:
611 transient_locals[targets[i].id] = values[
612 len(values) - (len(targets) - i)
613 ]
614 elif isinstance(target, ast.Subscript):
615 if isinstance(target.value, ast.Name):
616 name = target.value.id
617 container = transient_locals.get(name)
618 if container is None:
619 container = context.locals.get(name)
620 if container is None:
621 container = context.globals.get(name)
622 if container is None:
623 raise NameError(
624 f"{name} not found in locals, globals, nor builtins"
625 )
626 storage_dict = transient_locals
627 storage_key = name
628 elif isinstance(
629 target.value, ast.Attribute
630 ) and _is_instance_attribute_assignment(target.value, context):
631 attr = target.value.attr
632 container = class_transients.get(attr, None)
633 if container is None:
634 raise NameError(f"{attr} not found in class transients")
635 storage_dict = class_transients
636 storage_key = attr
637 else:
638 return
639
640 key = eval_node(target.slice, context)
641 attributes = (
642 dict.fromkeys(dir(container))
643 if policy.can_call(container.__dir__)
644 else {}
645 )
646 items = {}
647
648 if policy.can_get_item(container, None):
649 try:
650 items = dict(container.items())
651 except Exception:
652 pass
653
654 items[key] = value
655 duck_container = _Duck(attributes=attributes, items=items)
656 storage_dict[storage_key] = duck_container
657 elif _is_instance_attribute_assignment(target, context):
658 class_transients[target.attr] = value
659 else:
660 transient_locals[target.id] = value
661 return None
662
663
664def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext):
665 args = [eval_node(arg, context) for arg in node.args]
666 kwargs = {
667 k: v
668 for kw in node.keywords
669 for k, v in (
670 {kw.arg: eval_node(kw.value, context)}
671 if kw.arg
672 else eval_node(kw.value, context)
673 ).items()
674 }
675 return args, kwargs
676
677
678def _is_instance_attribute_assignment(
679 target: ast.AST, context: EvaluationContext
680) -> bool:
681 """Return True if target is an attribute access on the instance argument."""
682 return (
683 context.class_transients is not None
684 and context.instance_arg_name is not None
685 and isinstance(target, ast.Attribute)
686 and isinstance(getattr(target, "value", None), ast.Name)
687 and getattr(target.value, "id", None) == context.instance_arg_name
688 )
689
690
691def _get_coroutine_attributes() -> dict[str, Optional[object]]:
692 async def _dummy():
693 return None
694
695 coro = _dummy()
696 try:
697 return {attr: getattr(coro, attr, None) for attr in dir(coro)}
698 finally:
699 coro.close()
700
701
702def eval_node(node: Union[ast.AST, None], context: EvaluationContext):
703 """Evaluate AST node in provided context.
704
705 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments.
706
707 Does not evaluate actions that always have side effects:
708
709 - class definitions (``class sth: ...``)
710 - function definitions (``def sth: ...``)
711 - variable assignments (``x = 1``)
712 - augmented assignments (``x += 1``)
713 - deletions (``del x``)
714
715 Does not evaluate operations which do not return values:
716
717 - assertions (``assert x``)
718 - pass (``pass``)
719 - imports (``import x``)
720 - control flow:
721
722 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``)
723 - loops (``for`` and ``while``)
724 - exception handling
725
726 The purpose of this function is to guard against unwanted side-effects;
727 it does not give guarantees on protection from malicious code execution.
728 """
729 policy = get_policy(context)
730
731 if node is None:
732 return None
733 if isinstance(node, (ast.Interactive, ast.Module)):
734 result = None
735 for child_node in node.body:
736 result = eval_node(child_node, context)
737 return result
738 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
739 is_async = isinstance(node, ast.AsyncFunctionDef)
740 func_locals = context.transient_locals.copy()
741 func_context = context.replace(transient_locals=func_locals)
742 is_property = False
743 is_static = False
744 is_classmethod = False
745 for decorator_node in node.decorator_list:
746 try:
747 decorator = eval_node(decorator_node, context)
748 except NameError:
749 # if the decorator is not yet defined this is fine
750 # especialy because we don't handle imports yet
751 continue
752 if decorator is property:
753 is_property = True
754 elif decorator is staticmethod:
755 is_static = True
756 elif decorator is classmethod:
757 is_classmethod = True
758
759 if func_context.class_transients is not None:
760 if not is_static and not is_classmethod:
761 func_context.instance_arg_name = (
762 node.args.args[0].arg if node.args.args else None
763 )
764
765 return_type = eval_node(node.returns, context=context)
766
767 for child_node in node.body:
768 eval_node(child_node, func_context)
769
770 if is_property:
771 if return_type is not None:
772 context.transient_locals[node.name] = _resolve_annotation(
773 return_type, context
774 )
775 else:
776 return_value = _infer_return_value(node, func_context)
777 context.transient_locals[node.name] = return_value
778
779 return None
780
781 def dummy_function(*args, **kwargs):
782 pass
783
784 if return_type is not None:
785 dummy_function.__annotations__["return"] = return_type
786 else:
787 inferred_return = _infer_return_value(node, func_context)
788 if inferred_return is not None:
789 dummy_function.__inferred_return__ = inferred_return
790
791 dummy_function.__name__ = node.name
792 dummy_function.__node__ = node
793 dummy_function.__is_async__ = is_async
794 context.transient_locals[node.name] = dummy_function
795 return None
796 if isinstance(node, ast.Lambda):
797
798 def dummy_function(*args, **kwargs):
799 pass
800
801 dummy_function.__inferred_return__ = eval_node(node.body, context)
802 return dummy_function
803 if isinstance(node, ast.ClassDef):
804 # TODO support class decorators?
805 class_locals = {}
806 outer_locals = context.locals.copy()
807 outer_locals.update(context.transient_locals)
808 class_context = context.replace(
809 transient_locals=class_locals, locals=outer_locals
810 )
811 class_context.class_transients = class_locals
812 for child_node in node.body:
813 eval_node(child_node, class_context)
814 bases = tuple([eval_node(base, context) for base in node.bases])
815 dummy_class = type(node.name, bases, class_locals)
816 context.transient_locals[node.name] = dummy_class
817 return None
818 if isinstance(node, ast.Await):
819 value = eval_node(node.value, context)
820 if hasattr(value, "__awaited_type__"):
821 return value.__awaited_type__
822 return value
823 if isinstance(node, ast.While):
824 loop_locals = context.transient_locals.copy()
825 loop_context = context.replace(transient_locals=loop_locals)
826
827 result = None
828 for stmt in node.body:
829 result = eval_node(stmt, loop_context)
830
831 policy = get_policy(context)
832 merged_locals = _merge_dicts_by_key(
833 [loop_locals, context.transient_locals.copy()], policy
834 )
835 context.transient_locals.update(merged_locals)
836
837 return result
838 if isinstance(node, ast.For):
839 try:
840 iterable = eval_node(node.iter, context)
841 except Exception:
842 iterable = None
843
844 sample = None
845 if iterable is not None:
846 try:
847 if policy.can_call(getattr(iterable, "__iter__", None)):
848 sample = next(iter(iterable))
849 except Exception:
850 sample = None
851
852 loop_locals = context.transient_locals.copy()
853 loop_context = context.replace(transient_locals=loop_locals)
854
855 if sample is not None:
856 try:
857 fake_assign = ast.Assign(
858 targets=[node.target], value=ast.Constant(value=sample)
859 )
860 _handle_assign(fake_assign, loop_context)
861 except Exception:
862 pass
863
864 result = None
865 for stmt in node.body:
866 result = eval_node(stmt, loop_context)
867
868 policy = get_policy(context)
869 merged_locals = _merge_dicts_by_key(
870 [loop_locals, context.transient_locals.copy()], policy
871 )
872 context.transient_locals.update(merged_locals)
873
874 return result
875 if isinstance(node, ast.If):
876 branches = []
877 current = node
878 result = None
879 while True:
880 branch_locals = context.transient_locals.copy()
881 branch_context = context.replace(transient_locals=branch_locals)
882 for stmt in current.body:
883 result = eval_node(stmt, branch_context)
884 branches.append(branch_locals)
885 if not current.orelse:
886 break
887 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If):
888 # It's an elif - continue loop
889 current = current.orelse[0]
890 else:
891 # It's an else block - process and break
892 else_locals = context.transient_locals.copy()
893 else_context = context.replace(transient_locals=else_locals)
894 for stmt in current.orelse:
895 result = eval_node(stmt, else_context)
896 branches.append(else_locals)
897 break
898 branches.append(context.transient_locals.copy())
899 policy = get_policy(context)
900 merged_locals = _merge_dicts_by_key(branches, policy)
901 context.transient_locals.update(merged_locals)
902 return result
903 if isinstance(node, ast.Assign):
904 return _handle_assign(node, context)
905 if isinstance(node, ast.AnnAssign):
906 if node.simple:
907 value = _resolve_annotation(eval_node(node.annotation, context), context)
908 context.transient_locals[node.target.id] = value
909 # Handle non-simple annotated assignments only for self.x: type = value
910 if _is_instance_attribute_assignment(node.target, context):
911 value = _resolve_annotation(eval_node(node.annotation, context), context)
912 context.class_transients[node.target.attr] = value
913 return None
914 if isinstance(node, ast.Expression):
915 return eval_node(node.body, context)
916 if isinstance(node, ast.Expr):
917 return eval_node(node.value, context)
918 if isinstance(node, ast.Pass):
919 return None
920 if isinstance(node, ast.Import):
921 # TODO: populate transient_locals
922 return None
923 if isinstance(node, (ast.AugAssign, ast.Delete)):
924 return None
925 if isinstance(node, (ast.Global, ast.Nonlocal)):
926 return None
927 if isinstance(node, ast.BinOp):
928 left = eval_node(node.left, context)
929 right = eval_node(node.right, context)
930 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS)
931 if dunders:
932 if policy.can_operate(dunders, left, right):
933 return getattr(left, dunders[0])(right)
934 else:
935 raise GuardRejection(
936 f"Operation (`{dunders}`) for",
937 type(left),
938 f"not allowed in {context.evaluation} mode",
939 )
940 if isinstance(node, ast.Compare):
941 left = eval_node(node.left, context)
942 all_true = True
943 negate = False
944 for op, right in zip(node.ops, node.comparators):
945 right = eval_node(right, context)
946 dunder = None
947 dunders = _find_dunder(op, COMP_OP_DUNDERS)
948 if not dunders:
949 if isinstance(op, ast.NotIn):
950 dunders = COMP_OP_DUNDERS[ast.In]
951 negate = True
952 if isinstance(op, ast.Is):
953 dunder = "is_"
954 if isinstance(op, ast.IsNot):
955 dunder = "is_"
956 negate = True
957 if not dunder and dunders:
958 dunder = dunders[0]
959 if dunder:
960 a, b = (right, left) if dunder == "__contains__" else (left, right)
961 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b):
962 result = getattr(operator, dunder)(a, b)
963 if negate:
964 result = not result
965 if not result:
966 all_true = False
967 left = right
968 else:
969 raise GuardRejection(
970 f"Comparison (`{dunder}`) for",
971 type(left),
972 f"not allowed in {context.evaluation} mode",
973 )
974 else:
975 raise ValueError(
976 f"Comparison `{dunder}` not supported"
977 ) # pragma: no cover
978 return all_true
979 if isinstance(node, ast.Constant):
980 return node.value
981 if isinstance(node, ast.Tuple):
982 return tuple(eval_node(e, context) for e in node.elts)
983 if isinstance(node, ast.List):
984 return [eval_node(e, context) for e in node.elts]
985 if isinstance(node, ast.Set):
986 return {eval_node(e, context) for e in node.elts}
987 if isinstance(node, ast.Dict):
988 return dict(
989 zip(
990 [eval_node(k, context) for k in node.keys],
991 [eval_node(v, context) for v in node.values],
992 )
993 )
994 if isinstance(node, ast.Slice):
995 return slice(
996 eval_node(node.lower, context),
997 eval_node(node.upper, context),
998 eval_node(node.step, context),
999 )
1000 if isinstance(node, ast.UnaryOp):
1001 value = eval_node(node.operand, context)
1002 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS)
1003 if dunders:
1004 if policy.can_operate(dunders, value):
1005 try:
1006 return getattr(value, dunders[0])()
1007 except AttributeError:
1008 raise TypeError(
1009 f"bad operand type for unary {node.op}: {type(value)}"
1010 )
1011 else:
1012 raise GuardRejection(
1013 f"Operation (`{dunders}`) for",
1014 type(value),
1015 f"not allowed in {context.evaluation} mode",
1016 )
1017 if isinstance(node, ast.Subscript):
1018 value = eval_node(node.value, context)
1019 slice_ = eval_node(node.slice, context)
1020 if policy.can_get_item(value, slice_):
1021 return value[slice_]
1022 raise GuardRejection(
1023 "Subscript access (`__getitem__`) for",
1024 type(value), # not joined to avoid calling `repr`
1025 f" not allowed in {context.evaluation} mode",
1026 )
1027 if isinstance(node, ast.Name):
1028 return _eval_node_name(node.id, context)
1029 if isinstance(node, ast.Attribute):
1030 if (
1031 context.class_transients is not None
1032 and isinstance(node.value, ast.Name)
1033 and node.value.id == context.instance_arg_name
1034 ):
1035 return context.class_transients.get(node.attr)
1036 value = eval_node(node.value, context)
1037 if policy.can_get_attr(value, node.attr):
1038 return getattr(value, node.attr)
1039 try:
1040 cls = (
1041 value if isinstance(value, type) else getattr(value, "__class__", None)
1042 )
1043 if cls is not None:
1044 resolved_hints = get_type_hints(
1045 cls,
1046 globalns=(context.globals or {}),
1047 localns=(context.locals or {}),
1048 )
1049 if node.attr in resolved_hints:
1050 annotated = resolved_hints[node.attr]
1051 return _resolve_annotation(annotated, context)
1052 except Exception:
1053 # Fall through to the guard rejection
1054 pass
1055 raise GuardRejection(
1056 "Attribute access (`__getattr__`) for",
1057 type(value), # not joined to avoid calling `repr`
1058 f"not allowed in {context.evaluation} mode",
1059 )
1060 if isinstance(node, ast.IfExp):
1061 test = eval_node(node.test, context)
1062 if test:
1063 return eval_node(node.body, context)
1064 else:
1065 return eval_node(node.orelse, context)
1066 if isinstance(node, ast.Call):
1067 func = eval_node(node.func, context)
1068 if policy.can_call(func):
1069 args, kwargs = _extract_args_and_kwargs(node, context)
1070 return func(*args, **kwargs)
1071 if isclass(func):
1072 # this code path gets entered when calling class e.g. `MyClass()`
1073 # or `my_instance.__class__()` - in both cases `func` is `MyClass`.
1074 # Should return `MyClass` if `__new__` is not overridden,
1075 # otherwise whatever `__new__` return type is.
1076 overridden_return_type = _eval_return_type(func.__new__, node, context)
1077 if overridden_return_type is not NOT_EVALUATED:
1078 return overridden_return_type
1079 return _create_duck_for_heap_type(func)
1080 else:
1081 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED)
1082 return_type = _eval_return_type(func, node, context)
1083 if getattr(func, "__is_async__", False):
1084 awaited_type = (
1085 inferred_return if inferred_return is not None else return_type
1086 )
1087 coroutine_duck = _Duck(attributes=_get_coroutine_attributes())
1088 coroutine_duck.__awaited_type__ = awaited_type
1089 return coroutine_duck
1090 if inferred_return is not NOT_EVALUATED:
1091 return inferred_return
1092 if return_type is not NOT_EVALUATED:
1093 return return_type
1094 raise GuardRejection(
1095 "Call for",
1096 func, # not joined to avoid calling `repr`
1097 f"not allowed in {context.evaluation} mode",
1098 )
1099 if isinstance(node, ast.Assert):
1100 # message is always the second item, so if it is defined user would be completing
1101 # on the message, not on the assertion test
1102 if node.msg:
1103 return eval_node(node.msg, context)
1104 return eval_node(node.test, context)
1105 return None
1106
1107
1108def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy):
1109 """Merge multiple dictionaries, combining values for each key."""
1110 if len(dicts) == 1:
1111 return dicts[0]
1112
1113 all_keys = set()
1114 for d in dicts:
1115 all_keys.update(d.keys())
1116
1117 merged = {}
1118 for key in all_keys:
1119 values = [d[key] for d in dicts if key in d]
1120 if values:
1121 merged[key] = _merge_values(values, policy)
1122
1123 return merged
1124
1125
1126def _merge_values(values, policy: EvaluationPolicy):
1127 """Recursively merge multiple values, combining attributes and dict items."""
1128 if len(values) == 1:
1129 return values[0]
1130
1131 types = {type(v) for v in values}
1132 merged_items = None
1133 key_values = {}
1134 attributes = set()
1135 for v in values:
1136 if policy.can_call(v.__dir__):
1137 attributes.update(dir(v))
1138 try:
1139 if policy.can_call(v.items):
1140 try:
1141 for k, val in v.items():
1142 key_values.setdefault(k, []).append(val)
1143 except Exception as e:
1144 pass
1145 elif policy.can_call(v.keys):
1146 try:
1147 for k in v.keys():
1148 key_values.setdefault(k, []).append(None)
1149 except Exception as e:
1150 pass
1151 except Exception as e:
1152 pass
1153
1154 if key_values:
1155 merged_items = {
1156 k: _merge_values(vals, policy) if vals[0] is not None else None
1157 for k, vals in key_values.items()
1158 }
1159
1160 if len(types) == 1:
1161 t = next(iter(types))
1162 if t not in (dict,) and not (
1163 hasattr(next(iter(values)), "__getitem__")
1164 and (
1165 hasattr(next(iter(values)), "items")
1166 or hasattr(next(iter(values)), "keys")
1167 )
1168 ):
1169 if t in (list, set, tuple):
1170 return t
1171 return values[0]
1172
1173 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items)
1174
1175
1176def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext):
1177 """Infer the return value(s) of a function by evaluating all return statements."""
1178 return_values = _collect_return_values(node.body, context)
1179
1180 if not return_values:
1181 return None
1182 if len(return_values) == 1:
1183 return return_values[0]
1184
1185 policy = get_policy(context)
1186 return _merge_values(return_values, policy)
1187
1188
1189def _collect_return_values(body, context):
1190 """Recursively collect return values from a list of AST statements."""
1191 return_values = []
1192 for stmt in body:
1193 if isinstance(stmt, ast.Return):
1194 if stmt.value is None:
1195 continue
1196 try:
1197 value = eval_node(stmt.value, context)
1198 if value is not None and value is not NOT_EVALUATED:
1199 return_values.append(value)
1200 except Exception:
1201 pass
1202 if isinstance(
1203 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda)
1204 ):
1205 continue
1206 elif hasattr(stmt, "body") and isinstance(stmt.body, list):
1207 return_values.extend(_collect_return_values(stmt.body, context))
1208 if isinstance(stmt, ast.Try):
1209 for h in stmt.handlers:
1210 if hasattr(h, "body"):
1211 return_values.extend(_collect_return_values(h.body, context))
1212 if hasattr(stmt, "orelse"):
1213 return_values.extend(_collect_return_values(stmt.orelse, context))
1214 if hasattr(stmt, "finalbody"):
1215 return_values.extend(_collect_return_values(stmt.finalbody, context))
1216 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list):
1217 return_values.extend(_collect_return_values(stmt.orelse, context))
1218 return return_values
1219
1220
1221def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext):
1222 """Evaluate return type of a given callable function.
1223
1224 Returns the built-in type, a duck or NOT_EVALUATED sentinel.
1225 """
1226 try:
1227 sig = signature(func)
1228 except ValueError:
1229 sig = UNKNOWN_SIGNATURE
1230 # if annotation was not stringized, or it was stringized
1231 # but resolved by signature call we know the return type
1232 not_empty = sig.return_annotation is not Signature.empty
1233 if not_empty:
1234 return _resolve_annotation(sig.return_annotation, context, sig, func, node)
1235 return NOT_EVALUATED
1236
1237
1238def _eval_annotation(
1239 annotation: str,
1240 context: EvaluationContext,
1241):
1242 return (
1243 _eval_node_name(annotation, context)
1244 if isinstance(annotation, str)
1245 else annotation
1246 )
1247
1248
1249class _GetItemDuck(dict):
1250 """A dict subclass that always returns the factory instance and claims to have any item."""
1251
1252 def __init__(self, factory, *args, **kwargs):
1253 super().__init__(*args, **kwargs)
1254 self._factory = factory
1255
1256 def __getitem__(self, key):
1257 return self._factory()
1258
1259 def __contains__(self, key):
1260 return True
1261
1262
1263def _resolve_annotation(
1264 annotation: object | str,
1265 context: EvaluationContext,
1266 sig: Signature | None = None,
1267 func: Callable | None = None,
1268 node: ast.Call | None = None,
1269):
1270 """Resolve annotation created by user with `typing` module and custom objects."""
1271 if annotation is None:
1272 return None
1273 annotation = _eval_annotation(annotation, context)
1274 origin = get_origin(annotation)
1275 if annotation is Self and func and hasattr(func, "__self__"):
1276 return func.__self__
1277 elif origin is Literal:
1278 type_args = get_args(annotation)
1279 if len(type_args) == 1:
1280 return type_args[0]
1281 elif annotation is LiteralString:
1282 return ""
1283 elif annotation is AnyStr:
1284 index = None
1285 if func and hasattr(func, "__node__"):
1286 def_node = func.__node__
1287 for i, arg in enumerate(def_node.args.args):
1288 if not arg.annotation:
1289 continue
1290 annotation = _eval_annotation(arg.annotation.id, context)
1291 if annotation is AnyStr:
1292 index = i
1293 break
1294 is_bound_method = (
1295 isinstance(func, MethodType) and getattr(func, "__self__") is not None
1296 )
1297 if index and is_bound_method:
1298 index -= 1
1299 elif sig:
1300 for i, (key, value) in enumerate(sig.parameters.items()):
1301 if value.annotation is AnyStr:
1302 index = i
1303 break
1304 if index is None:
1305 return None
1306 if index < 0 or index >= len(node.args):
1307 return None
1308 return eval_node(node.args[index], context)
1309 elif origin is TypeGuard:
1310 return False
1311 elif origin is set or origin is list:
1312 # only one type argument allowed
1313 attributes = [
1314 attr
1315 for attr in dir(
1316 _resolve_annotation(get_args(annotation)[0], context, sig, func, node)
1317 )
1318 ]
1319 duck = _Duck(attributes=dict.fromkeys(attributes))
1320 return _Duck(
1321 attributes=dict.fromkeys(dir(origin())),
1322 # items are not strrictly needed for set
1323 items=_GetItemDuck(lambda: duck),
1324 )
1325 elif origin is tuple:
1326 # multiple type arguments
1327 return tuple(
1328 _resolve_annotation(arg, context, sig, func, node)
1329 for arg in get_args(annotation)
1330 )
1331 elif origin is Union:
1332 # multiple type arguments
1333 attributes = [
1334 attr
1335 for type_arg in get_args(annotation)
1336 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node))
1337 ]
1338 return _Duck(attributes=dict.fromkeys(attributes))
1339 elif is_typeddict(annotation):
1340 return _Duck(
1341 attributes=dict.fromkeys(dir(dict())),
1342 items={
1343 k: _resolve_annotation(v, context, sig, func, node)
1344 for k, v in annotation.__annotations__.items()
1345 },
1346 )
1347 elif hasattr(annotation, "_is_protocol"):
1348 return _Duck(attributes=dict.fromkeys(dir(annotation)))
1349 elif origin is Annotated:
1350 type_arg = get_args(annotation)[0]
1351 return _resolve_annotation(type_arg, context, sig, func, node)
1352 elif isinstance(annotation, NewType):
1353 return _eval_or_create_duck(annotation.__supertype__, context)
1354 elif isinstance(annotation, TypeAliasType):
1355 return _eval_or_create_duck(annotation.__value__, context)
1356 else:
1357 return _eval_or_create_duck(annotation, context)
1358
1359
1360def _eval_node_name(node_id: str, context: EvaluationContext):
1361 policy = get_policy(context)
1362 if node_id in context.transient_locals:
1363 return context.transient_locals[node_id]
1364 if policy.allow_locals_access and node_id in context.locals:
1365 return context.locals[node_id]
1366 if policy.allow_globals_access and node_id in context.globals:
1367 return context.globals[node_id]
1368 if policy.allow_builtins_access and hasattr(builtins, node_id):
1369 # note: do not use __builtins__, it is implementation detail of cPython
1370 return getattr(builtins, node_id)
1371 if policy.allow_auto_import and context.auto_import:
1372 return context.auto_import(node_id)
1373 if not policy.allow_globals_access and not policy.allow_locals_access:
1374 raise GuardRejection(
1375 f"Namespace access not allowed in {context.evaluation} mode"
1376 )
1377 else:
1378 raise NameError(f"{node_id} not found in locals, globals, nor builtins")
1379
1380
1381def _eval_or_create_duck(duck_type, context: EvaluationContext):
1382 policy = get_policy(context)
1383 # if allow-listed builtin is on type annotation, instantiate it
1384 if policy.can_call(duck_type):
1385 return duck_type()
1386 # if custom class is in type annotation, mock it
1387 return _create_duck_for_heap_type(duck_type)
1388
1389
1390def _create_duck_for_heap_type(duck_type):
1391 """Create an imitation of an object of a given type (a duck).
1392
1393 Returns the duck or NOT_EVALUATED sentinel if duck could not be created.
1394 """
1395 duck = ImpersonatingDuck()
1396 try:
1397 # this only works for heap types, not builtins
1398 duck.__class__ = duck_type
1399 return duck
1400 except TypeError:
1401 pass
1402 return NOT_EVALUATED
1403
1404
1405SUPPORTED_EXTERNAL_GETITEM = {
1406 ("pandas", "core", "indexing", "_iLocIndexer"),
1407 ("pandas", "core", "indexing", "_LocIndexer"),
1408 ("pandas", "DataFrame"),
1409 ("pandas", "Series"),
1410 ("numpy", "ndarray"),
1411 ("numpy", "void"),
1412}
1413
1414
1415BUILTIN_GETITEM: set[InstancesHaveGetItem] = {
1416 dict,
1417 str, # type: ignore[arg-type]
1418 bytes, # type: ignore[arg-type]
1419 list,
1420 tuple,
1421 type, # for type annotations like list[str]
1422 _Duck,
1423 collections.defaultdict,
1424 collections.deque,
1425 collections.OrderedDict,
1426 collections.ChainMap,
1427 collections.UserDict,
1428 collections.UserList,
1429 collections.UserString, # type: ignore[arg-type]
1430 _DummyNamedTuple,
1431 _IdentitySubscript,
1432}
1433
1434
1435def _list_methods(cls, source=None):
1436 """For use on immutable objects or with methods returning a copy"""
1437 return [getattr(cls, k) for k in (source if source else dir(cls))]
1438
1439
1440dict_non_mutating_methods = ("copy", "keys", "values", "items")
1441list_non_mutating_methods = ("copy", "index", "count")
1442set_non_mutating_methods = set(dir(set)) & set(dir(frozenset))
1443
1444
1445dict_keys: type[collections.abc.KeysView] = type({}.keys())
1446dict_values: type = type({}.values())
1447dict_items: type = type({}.items())
1448
1449NUMERICS = {int, float, complex}
1450
1451ALLOWED_CALLS = {
1452 bytes,
1453 *_list_methods(bytes),
1454 bytes.__iter__,
1455 dict,
1456 *_list_methods(dict, dict_non_mutating_methods),
1457 dict.__iter__,
1458 dict_keys.__iter__,
1459 dict_values.__iter__,
1460 dict_items.__iter__,
1461 dict_keys.isdisjoint,
1462 list,
1463 *_list_methods(list, list_non_mutating_methods),
1464 list.__iter__,
1465 set,
1466 *_list_methods(set, set_non_mutating_methods),
1467 set.__iter__,
1468 frozenset,
1469 *_list_methods(frozenset),
1470 frozenset.__iter__,
1471 range,
1472 range.__iter__,
1473 str,
1474 *_list_methods(str),
1475 str.__iter__,
1476 tuple,
1477 *_list_methods(tuple),
1478 tuple.__iter__,
1479 bool,
1480 *_list_methods(bool),
1481 *NUMERICS,
1482 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)],
1483 collections.deque,
1484 *_list_methods(collections.deque, list_non_mutating_methods),
1485 collections.deque.__iter__,
1486 collections.defaultdict,
1487 *_list_methods(collections.defaultdict, dict_non_mutating_methods),
1488 collections.defaultdict.__iter__,
1489 collections.OrderedDict,
1490 *_list_methods(collections.OrderedDict, dict_non_mutating_methods),
1491 collections.OrderedDict.__iter__,
1492 collections.UserDict,
1493 *_list_methods(collections.UserDict, dict_non_mutating_methods),
1494 collections.UserDict.__iter__,
1495 collections.UserList,
1496 *_list_methods(collections.UserList, list_non_mutating_methods),
1497 collections.UserList.__iter__,
1498 collections.UserString,
1499 *_list_methods(collections.UserString, dir(str)),
1500 collections.UserString.__iter__,
1501 collections.Counter,
1502 *_list_methods(collections.Counter, dict_non_mutating_methods),
1503 collections.Counter.__iter__,
1504 collections.Counter.elements,
1505 collections.Counter.most_common,
1506 object.__dir__,
1507 type.__dir__,
1508 _Duck.__dir__,
1509}
1510
1511BUILTIN_GETATTR: set[MayHaveGetattr] = {
1512 *BUILTIN_GETITEM,
1513 set,
1514 frozenset,
1515 object,
1516 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`.
1517 *NUMERICS,
1518 dict_keys,
1519 MethodDescriptorType,
1520 ModuleType,
1521}
1522
1523
1524BUILTIN_OPERATIONS = {*BUILTIN_GETATTR}
1525
1526EVALUATION_POLICIES = {
1527 "minimal": EvaluationPolicy(
1528 allow_builtins_access=True,
1529 allow_locals_access=False,
1530 allow_globals_access=False,
1531 allow_item_access=False,
1532 allow_attr_access=False,
1533 allowed_calls=set(),
1534 allow_any_calls=False,
1535 allow_all_operations=False,
1536 ),
1537 "limited": SelectivePolicy(
1538 allowed_getitem=BUILTIN_GETITEM,
1539 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM,
1540 allowed_getattr=BUILTIN_GETATTR,
1541 allowed_getattr_external={
1542 # pandas Series/Frame implements custom `__getattr__`
1543 ("pandas", "DataFrame"),
1544 ("pandas", "Series"),
1545 },
1546 allowed_operations=BUILTIN_OPERATIONS,
1547 allow_builtins_access=True,
1548 allow_locals_access=True,
1549 allow_globals_access=True,
1550 allow_getitem_on_types=True,
1551 allowed_calls=ALLOWED_CALLS,
1552 ),
1553 "unsafe": EvaluationPolicy(
1554 allow_builtins_access=True,
1555 allow_locals_access=True,
1556 allow_globals_access=True,
1557 allow_attr_access=True,
1558 allow_item_access=True,
1559 allow_any_calls=True,
1560 allow_all_operations=True,
1561 ),
1562}
1563
1564
1565__all__ = [
1566 "guarded_eval",
1567 "eval_node",
1568 "GuardRejection",
1569 "EvaluationContext",
1570 "_unbind_method",
1571]