1from copy import copy
2from inspect import isclass, signature, Signature, getmodule
3from typing import (
4 Annotated,
5 AnyStr,
6 Callable,
7 Literal,
8 NamedTuple,
9 NewType,
10 Optional,
11 Protocol,
12 Sequence,
13 TypeGuard,
14 Union,
15 get_args,
16 get_origin,
17 is_typeddict,
18)
19import ast
20import builtins
21import collections
22import dataclasses
23import operator
24import sys
25import typing
26import warnings
27from functools import cached_property
28from dataclasses import dataclass, field
29from types import MethodDescriptorType, ModuleType, MethodType
30
31from IPython.utils.decorators import undoc
32
33import types
34from typing import Self, LiteralString, get_type_hints
35
36if sys.version_info < (3, 12):
37 from typing_extensions import TypeAliasType
38else:
39 from typing import TypeAliasType
40
41
42@undoc
43class HasGetItem(Protocol):
44 def __getitem__(self, key) -> None:
45 ...
46
47
48@undoc
49class InstancesHaveGetItem(Protocol):
50 def __call__(self, *args, **kwargs) -> HasGetItem:
51 ...
52
53
54@undoc
55class HasGetAttr(Protocol):
56 def __getattr__(self, key) -> None:
57 ...
58
59
60@undoc
61class DoesNotHaveGetAttr(Protocol):
62 pass
63
64
65# By default `__getattr__` is not explicitly implemented on most objects
66MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr]
67
68
69def _unbind_method(func: Callable) -> Union[Callable, None]:
70 """Get unbound method for given bound method.
71
72 Returns None if cannot get unbound method, or method is already unbound.
73 """
74 owner = getattr(func, "__self__", None)
75 owner_class = type(owner)
76 name = getattr(func, "__name__", None)
77 instance_dict_overrides = getattr(owner, "__dict__", None)
78 if (
79 owner is not None
80 and name
81 and (
82 not instance_dict_overrides
83 or (instance_dict_overrides and name not in instance_dict_overrides)
84 )
85 ):
86 return getattr(owner_class, name)
87 return None
88
89
90@undoc
91@dataclass
92class EvaluationPolicy:
93 """Definition of evaluation policy."""
94
95 allow_locals_access: bool = False
96 allow_globals_access: bool = False
97 allow_item_access: bool = False
98 allow_attr_access: bool = False
99 allow_builtins_access: bool = False
100 allow_all_operations: bool = False
101 allow_any_calls: bool = False
102 allow_auto_import: bool = False
103 allowed_calls: set[Callable] = field(default_factory=set)
104
105 def can_get_item(self, value, item):
106 return self.allow_item_access
107
108 def can_get_attr(self, value, attr):
109 return self.allow_attr_access
110
111 def can_operate(self, dunders: tuple[str, ...], a, b=None):
112 if self.allow_all_operations:
113 return True
114
115 def can_call(self, func):
116 if self.allow_any_calls:
117 return True
118
119 if func in self.allowed_calls:
120 return True
121
122 owner_method = _unbind_method(func)
123
124 if owner_method and owner_method in self.allowed_calls:
125 return True
126
127
128def _get_external(module_name: str, access_path: Sequence[str]):
129 """Get value from external module given a dotted access path.
130
131 Only gets value if the module is already imported.
132
133 Raises:
134 * `KeyError` if module is removed not found, and
135 * `AttributeError` if access path does not match an exported object
136 """
137 try:
138 member_type = sys.modules[module_name]
139 # standard module
140 for attr in access_path:
141 member_type = getattr(member_type, attr)
142 return member_type
143 except (KeyError, AttributeError):
144 # handle modules in namespace packages
145 module_path = ".".join([module_name, *access_path])
146 if module_path in sys.modules:
147 return sys.modules[module_path]
148 raise
149
150
151def _has_original_dunder_external(
152 value,
153 module_name: str,
154 access_path: Sequence[str],
155 method_name: str,
156):
157 if module_name not in sys.modules:
158 full_module_path = ".".join([module_name, *access_path])
159 if full_module_path not in sys.modules:
160 # LBYLB as it is faster
161 return False
162 try:
163 member_type = _get_external(module_name, access_path)
164 value_type = type(value)
165 if type(value) == member_type:
166 return True
167 if isinstance(member_type, ModuleType):
168 value_module = getmodule(value_type)
169 if not value_module or not value_module.__name__:
170 return False
171 if (
172 value_module.__name__ == member_type.__name__
173 or value_module.__name__.startswith(member_type.__name__ + ".")
174 ):
175 return True
176 if method_name == "__getattribute__":
177 # we have to short-circuit here due to an unresolved issue in
178 # `isinstance` implementation: https://bugs.python.org/issue32683
179 return False
180 if not isinstance(member_type, ModuleType) and isinstance(value, member_type):
181 method = getattr(value_type, method_name, None)
182 member_method = getattr(member_type, method_name, None)
183 if member_method == method:
184 return True
185 if isinstance(member_type, ModuleType):
186 method = getattr(value_type, method_name, None)
187 for base_class in value_type.__mro__[1:]:
188 base_module = getmodule(base_class)
189 if base_module and (
190 base_module.__name__ == member_type.__name__
191 or base_module.__name__.startswith(member_type.__name__ + ".")
192 ):
193 # Check if the method comes from this trusted base class
194 base_method = getattr(base_class, method_name, None)
195 if base_method is not None and base_method == method:
196 return True
197 except (AttributeError, KeyError):
198 return False
199
200
201def _has_original_dunder(
202 value, allowed_types, allowed_methods, allowed_external, method_name
203):
204 # note: Python ignores `__getattr__`/`__getitem__` on instances,
205 # we only need to check at class level
206 value_type = type(value)
207
208 # strict type check passes → no need to check method
209 if value_type in allowed_types:
210 return True
211
212 method = getattr(value_type, method_name, None)
213
214 if method is None:
215 return None
216
217 if method in allowed_methods:
218 return True
219
220 for module_name, *access_path in allowed_external:
221 if _has_original_dunder_external(value, module_name, access_path, method_name):
222 return True
223
224 return False
225
226
227def _coerce_path_to_tuples(
228 allow_list: set[tuple[str, ...] | str],
229) -> set[tuple[str, ...]]:
230 """Replace dotted paths on the provided allow-list with tuples."""
231 return {
232 path if isinstance(path, tuple) else tuple(path.split("."))
233 for path in allow_list
234 }
235
236
237@undoc
238@dataclass
239class SelectivePolicy(EvaluationPolicy):
240 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set)
241 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set)
242
243 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set)
244 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set)
245
246 allowed_operations: set = field(default_factory=set)
247 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set)
248
249 allow_getitem_on_types: bool = field(default_factory=bool)
250
251 _operation_methods_cache: dict[str, set[Callable]] = field(
252 default_factory=dict, init=False
253 )
254
255 def can_get_attr(self, value, attr):
256 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external)
257
258 has_original_attribute = _has_original_dunder(
259 value,
260 allowed_types=self.allowed_getattr,
261 allowed_methods=self._getattribute_methods,
262 allowed_external=allowed_getattr_external,
263 method_name="__getattribute__",
264 )
265 has_original_attr = _has_original_dunder(
266 value,
267 allowed_types=self.allowed_getattr,
268 allowed_methods=self._getattr_methods,
269 allowed_external=allowed_getattr_external,
270 method_name="__getattr__",
271 )
272
273 accept = False
274
275 # Many objects do not have `__getattr__`, this is fine.
276 if has_original_attr is None and has_original_attribute:
277 accept = True
278 else:
279 # Accept objects without modifications to `__getattr__` and `__getattribute__`
280 accept = has_original_attr and has_original_attribute
281
282 if accept:
283 # We still need to check for overridden properties.
284
285 value_class = type(value)
286 if not hasattr(value_class, attr):
287 return True
288
289 class_attr_val = getattr(value_class, attr)
290 is_property = isinstance(class_attr_val, property)
291
292 if not is_property:
293 return True
294
295 # Properties in allowed types are ok (although we do not include any
296 # properties in our default allow list currently).
297 if type(value) in self.allowed_getattr:
298 return True # pragma: no cover
299
300 # Properties in subclasses of allowed types may be ok if not changed
301 for module_name, *access_path in allowed_getattr_external:
302 try:
303 external_class = _get_external(module_name, access_path)
304 external_class_attr_val = getattr(external_class, attr)
305 except (KeyError, AttributeError):
306 return False # pragma: no cover
307 return class_attr_val == external_class_attr_val
308
309 return False
310
311 def can_get_item(self, value, item):
312 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified."""
313 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external)
314 if self.allow_getitem_on_types:
315 # e.g. Union[str, int] or Literal[True, 1]
316 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)):
317 return True
318 # PEP 560 e.g. list[str]
319 if isinstance(value, type) and hasattr(value, "__class_getitem__"):
320 return True
321 return _has_original_dunder(
322 value,
323 allowed_types=self.allowed_getitem,
324 allowed_methods=self._getitem_methods,
325 allowed_external=allowed_getitem_external,
326 method_name="__getitem__",
327 )
328
329 def can_operate(self, dunders: tuple[str, ...], a, b=None):
330 allowed_operations_external = _coerce_path_to_tuples(
331 self.allowed_operations_external
332 )
333 objects = [a]
334 if b is not None:
335 objects.append(b)
336 return all(
337 [
338 _has_original_dunder(
339 obj,
340 allowed_types=self.allowed_operations,
341 allowed_methods=self._operator_dunder_methods(dunder),
342 allowed_external=allowed_operations_external,
343 method_name=dunder,
344 )
345 for dunder in dunders
346 for obj in objects
347 ]
348 )
349
350 def _operator_dunder_methods(self, dunder: str) -> set[Callable]:
351 if dunder not in self._operation_methods_cache:
352 self._operation_methods_cache[dunder] = self._safe_get_methods(
353 self.allowed_operations, dunder
354 )
355 return self._operation_methods_cache[dunder]
356
357 @cached_property
358 def _getitem_methods(self) -> set[Callable]:
359 return self._safe_get_methods(self.allowed_getitem, "__getitem__")
360
361 @cached_property
362 def _getattr_methods(self) -> set[Callable]:
363 return self._safe_get_methods(self.allowed_getattr, "__getattr__")
364
365 @cached_property
366 def _getattribute_methods(self) -> set[Callable]:
367 return self._safe_get_methods(self.allowed_getattr, "__getattribute__")
368
369 def _safe_get_methods(self, classes, name) -> set[Callable]:
370 return {
371 method
372 for class_ in classes
373 for method in [getattr(class_, name, None)]
374 if method
375 }
376
377
378class _DummyNamedTuple(NamedTuple):
379 """Used internally to retrieve methods of named tuple instance."""
380
381
382EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"]
383
384
385@dataclass
386class EvaluationContext:
387 #: Local namespace
388 locals: dict
389 #: Global namespace
390 globals: dict
391 #: Evaluation policy identifier
392 evaluation: EvaluationPolicyName = "forbidden"
393 #: Whether the evaluation of code takes place inside of a subscript.
394 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``.
395 in_subscript: bool = False
396 #: Auto import method
397 auto_import: Callable[list[str], ModuleType] | None = None
398 #: Overrides for evaluation policy
399 policy_overrides: dict = field(default_factory=dict)
400 #: Transient local namespace used to store mocks
401 transient_locals: dict = field(default_factory=dict)
402 #: Transients of class level
403 class_transients: dict | None = None
404 #: Instance variable name used in the method definition
405 instance_arg_name: str | None = None
406 #: Currently associated value
407 #: Useful for adding items to _Duck on annotated assignment
408 current_value: ast.AST | None = None
409
410 def replace(self, /, **changes):
411 """Return a new copy of the context, with specified changes"""
412 return dataclasses.replace(self, **changes)
413
414
415class _IdentitySubscript:
416 """Returns the key itself when item is requested via subscript."""
417
418 def __getitem__(self, key):
419 return key
420
421
422IDENTITY_SUBSCRIPT = _IdentitySubscript()
423SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__"
424UNKNOWN_SIGNATURE = Signature()
425NOT_EVALUATED = object()
426
427
428class GuardRejection(Exception):
429 """Exception raised when guard rejects evaluation attempt."""
430
431 pass
432
433
434def guarded_eval(code: str, context: EvaluationContext):
435 """Evaluate provided code in the evaluation context.
436
437 If evaluation policy given by context is set to ``forbidden``
438 no evaluation will be performed; if it is set to ``dangerous``
439 standard :func:`eval` will be used; finally, for any other,
440 policy :func:`eval_node` will be called on parsed AST.
441 """
442 locals_ = context.locals
443
444 if context.evaluation == "forbidden":
445 raise GuardRejection("Forbidden mode")
446
447 # note: not using `ast.literal_eval` as it does not implement
448 # getitem at all, for example it fails on simple `[0][1]`
449
450 if context.in_subscript:
451 # syntactic sugar for ellipsis (:) is only available in subscripts
452 # so we need to trick the ast parser into thinking that we have
453 # a subscript, but we need to be able to later recognise that we did
454 # it so we can ignore the actual __getitem__ operation
455 if not code:
456 return tuple()
457 locals_ = locals_.copy()
458 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT
459 code = SUBSCRIPT_MARKER + "[" + code + "]"
460 context = context.replace(locals=locals_)
461
462 if context.evaluation == "dangerous":
463 return eval(code, context.globals, context.locals)
464
465 node = ast.parse(code, mode="exec")
466
467 return eval_node(node, context)
468
469
470BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = {
471 ast.Add: ("__add__",),
472 ast.Sub: ("__sub__",),
473 ast.Mult: ("__mul__",),
474 ast.Div: ("__truediv__",),
475 ast.FloorDiv: ("__floordiv__",),
476 ast.Mod: ("__mod__",),
477 ast.Pow: ("__pow__",),
478 ast.LShift: ("__lshift__",),
479 ast.RShift: ("__rshift__",),
480 ast.BitOr: ("__or__",),
481 ast.BitXor: ("__xor__",),
482 ast.BitAnd: ("__and__",),
483 ast.MatMult: ("__matmul__",),
484}
485
486COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = {
487 ast.Eq: ("__eq__",),
488 ast.NotEq: ("__ne__", "__eq__"),
489 ast.Lt: ("__lt__", "__gt__"),
490 ast.LtE: ("__le__", "__ge__"),
491 ast.Gt: ("__gt__", "__lt__"),
492 ast.GtE: ("__ge__", "__le__"),
493 ast.In: ("__contains__",),
494 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially
495}
496
497UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = {
498 ast.USub: ("__neg__",),
499 ast.UAdd: ("__pos__",),
500 # we have to check both __inv__ and __invert__!
501 ast.Invert: ("__invert__", "__inv__"),
502 ast.Not: ("__not__",),
503}
504
505GENERIC_CONTAINER_TYPES = (dict, list, set, tuple, frozenset)
506
507
508class ImpersonatingDuck:
509 """A dummy class used to create objects of other classes without calling their ``__init__``"""
510
511 # no-op: override __class__ to impersonate
512
513
514class _Duck:
515 """A dummy class used to create objects pretending to have given attributes"""
516
517 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None):
518 self.attributes = attributes if attributes is not None else {}
519 self.items = items if items is not None else {}
520
521 def __getattr__(self, attr: str):
522 return self.attributes[attr]
523
524 def __hasattr__(self, attr: str):
525 return attr in self.attributes
526
527 def __dir__(self):
528 return [*dir(super), *self.attributes]
529
530 def __getitem__(self, key: str):
531 return self.items[key]
532
533 def __hasitem__(self, key: str):
534 return self.items[key]
535
536 def _ipython_key_completions_(self):
537 return self.items.keys()
538
539
540def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]:
541 dunder = None
542 for op, candidate_dunder in dunders.items():
543 if isinstance(node_op, op):
544 dunder = candidate_dunder
545 return dunder
546
547
548def get_policy(context: EvaluationContext) -> EvaluationPolicy:
549 policy = copy(EVALUATION_POLICIES[context.evaluation])
550
551 for key, value in context.policy_overrides.items():
552 if hasattr(policy, key):
553 setattr(policy, key, value)
554 return policy
555
556
557def _validate_policy_overrides(
558 policy_name: EvaluationPolicyName, policy_overrides: dict
559) -> bool:
560 policy = EVALUATION_POLICIES[policy_name]
561
562 all_good = True
563 for key, value in policy_overrides.items():
564 if not hasattr(policy, key):
565 warnings.warn(
566 f"Override {key!r} is not valid with {policy_name!r} evaluation policy"
567 )
568 all_good = False
569 return all_good
570
571
572def _is_type_annotation(obj) -> bool:
573 """
574 Returns True if obj is a type annotation, False otherwise.
575 """
576 if isinstance(obj, type):
577 return True
578 if isinstance(obj, types.GenericAlias):
579 return True
580 if hasattr(types, "UnionType") and isinstance(obj, types.UnionType):
581 return True
582 if isinstance(obj, (typing._SpecialForm, typing._BaseGenericAlias)):
583 return True
584 if isinstance(obj, typing.TypeVar):
585 return True
586 # Types that support __class_getitem__
587 if isinstance(obj, type) and hasattr(obj, "__class_getitem__"):
588 return True
589 # Fallback: check if get_origin returns something
590 if hasattr(typing, "get_origin") and get_origin(obj) is not None:
591 return True
592
593 return False
594
595
596def _handle_assign(node: ast.Assign, context: EvaluationContext):
597 value = eval_node(node.value, context)
598 transient_locals = context.transient_locals
599 policy = get_policy(context)
600 class_transients = context.class_transients
601 for target in node.targets:
602 if isinstance(target, (ast.Tuple, ast.List)):
603 # Handle unpacking assignment
604 values = list(value)
605 targets = target.elts
606 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)]
607
608 # Unified handling: treat no starred as starred at end
609 star_or_last_idx = starred[0] if starred else len(targets)
610
611 # Before starred
612 for i in range(star_or_last_idx):
613 # Check for self.x assignment
614 if _is_instance_attribute_assignment(targets[i], context):
615 class_transients[targets[i].attr] = values[i]
616 else:
617 transient_locals[targets[i].id] = values[i]
618
619 # Starred if exists
620 if starred:
621 end = len(values) - (len(targets) - star_or_last_idx - 1)
622 if _is_instance_attribute_assignment(
623 targets[star_or_last_idx], context
624 ):
625 class_transients[targets[star_or_last_idx].attr] = values[
626 star_or_last_idx:end
627 ]
628 else:
629 transient_locals[targets[star_or_last_idx].value.id] = values[
630 star_or_last_idx:end
631 ]
632
633 # After starred
634 for i in range(star_or_last_idx + 1, len(targets)):
635 if _is_instance_attribute_assignment(targets[i], context):
636 class_transients[targets[i].attr] = values[
637 len(values) - (len(targets) - i)
638 ]
639 else:
640 transient_locals[targets[i].id] = values[
641 len(values) - (len(targets) - i)
642 ]
643 elif isinstance(target, ast.Subscript):
644 if isinstance(target.value, ast.Name):
645 name = target.value.id
646 container = transient_locals.get(name)
647 if container is None:
648 container = context.locals.get(name)
649 if container is None:
650 container = context.globals.get(name)
651 if container is None:
652 raise NameError(
653 f"{name} not found in locals, globals, nor builtins"
654 )
655 storage_dict = transient_locals
656 storage_key = name
657 elif isinstance(
658 target.value, ast.Attribute
659 ) and _is_instance_attribute_assignment(target.value, context):
660 attr = target.value.attr
661 container = class_transients.get(attr, None)
662 if container is None:
663 raise NameError(f"{attr} not found in class transients")
664 storage_dict = class_transients
665 storage_key = attr
666 else:
667 return
668
669 key = eval_node(target.slice, context)
670 attributes = (
671 dict.fromkeys(dir(container))
672 if policy.can_call(container.__dir__)
673 else {}
674 )
675 items = {}
676
677 if policy.can_get_item(container, None):
678 try:
679 items = dict(container.items())
680 except Exception:
681 pass
682
683 items[key] = value
684 duck_container = _Duck(attributes=attributes, items=items)
685 storage_dict[storage_key] = duck_container
686 elif _is_instance_attribute_assignment(target, context):
687 class_transients[target.attr] = value
688 else:
689 transient_locals[target.id] = value
690 return None
691
692
693def _handle_annassign(node, context):
694 context_with_value = context.replace(current_value=getattr(node, "value", None))
695 annotation_result = eval_node(node.annotation, context_with_value)
696 if _is_type_annotation(annotation_result):
697 annotation_value = _resolve_annotation(annotation_result, context)
698 # Use Value for generic types
699 use_value = (
700 isinstance(annotation_value, GENERIC_CONTAINER_TYPES) and node.value is not None
701 )
702 else:
703 annotation_value = annotation_result
704 use_value = False
705
706 # LOCAL VARIABLE
707 if getattr(node, "simple", False) and isinstance(node.target, ast.Name):
708 name = node.target.id
709 if use_value:
710 return _handle_assign(
711 ast.Assign(targets=[node.target], value=node.value), context
712 )
713 context.transient_locals[name] = annotation_value
714 return None
715
716 # INSTANCE ATTRIBUTE
717 if _is_instance_attribute_assignment(node.target, context):
718 attr = node.target.attr
719 if use_value:
720 return _handle_assign(
721 ast.Assign(targets=[node.target], value=node.value), context
722 )
723 context.class_transients[attr] = annotation_value
724 return None
725
726 return None
727
728def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext):
729 args = [eval_node(arg, context) for arg in node.args]
730 kwargs = {
731 k: v
732 for kw in node.keywords
733 for k, v in (
734 {kw.arg: eval_node(kw.value, context)}
735 if kw.arg
736 else eval_node(kw.value, context)
737 ).items()
738 }
739 return args, kwargs
740
741
742def _is_instance_attribute_assignment(
743 target: ast.AST, context: EvaluationContext
744) -> bool:
745 """Return True if target is an attribute access on the instance argument."""
746 return (
747 context.class_transients is not None
748 and context.instance_arg_name is not None
749 and isinstance(target, ast.Attribute)
750 and isinstance(getattr(target, "value", None), ast.Name)
751 and getattr(target.value, "id", None) == context.instance_arg_name
752 )
753
754
755def _get_coroutine_attributes() -> dict[str, Optional[object]]:
756 async def _dummy():
757 return None
758
759 coro = _dummy()
760 try:
761 return {attr: getattr(coro, attr, None) for attr in dir(coro)}
762 finally:
763 coro.close()
764
765
766def eval_node(node: Union[ast.AST, None], context: EvaluationContext):
767 """Evaluate AST node in provided context.
768
769 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments.
770
771 Does not evaluate actions that always have side effects:
772
773 - class definitions (``class sth: ...``)
774 - function definitions (``def sth: ...``)
775 - variable assignments (``x = 1``)
776 - augmented assignments (``x += 1``)
777 - deletions (``del x``)
778
779 Does not evaluate operations which do not return values:
780
781 - assertions (``assert x``)
782 - pass (``pass``)
783 - imports (``import x``)
784 - control flow:
785
786 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``)
787 - loops (``for`` and ``while``)
788 - exception handling
789
790 The purpose of this function is to guard against unwanted side-effects;
791 it does not give guarantees on protection from malicious code execution.
792 """
793 policy = get_policy(context)
794
795 if node is None:
796 return None
797 if isinstance(node, (ast.Interactive, ast.Module)):
798 result = None
799 for child_node in node.body:
800 result = eval_node(child_node, context)
801 return result
802 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
803 is_async = isinstance(node, ast.AsyncFunctionDef)
804 func_locals = context.transient_locals.copy()
805 func_context = context.replace(transient_locals=func_locals)
806 is_property = False
807 is_static = False
808 is_classmethod = False
809 for decorator_node in node.decorator_list:
810 try:
811 decorator = eval_node(decorator_node, context)
812 except NameError:
813 # if the decorator is not yet defined this is fine
814 # especialy because we don't handle imports yet
815 continue
816 if decorator is property:
817 is_property = True
818 elif decorator is staticmethod:
819 is_static = True
820 elif decorator is classmethod:
821 is_classmethod = True
822
823 if func_context.class_transients is not None:
824 if not is_static and not is_classmethod:
825 func_context.instance_arg_name = (
826 node.args.args[0].arg if node.args.args else None
827 )
828
829 return_type = eval_node(node.returns, context=context)
830
831 for child_node in node.body:
832 eval_node(child_node, func_context)
833
834 if is_property:
835 if return_type is not None:
836 if _is_type_annotation(return_type):
837 context.transient_locals[node.name] = _resolve_annotation(
838 return_type, context
839 )
840 else:
841 context.transient_locals[node.name] = return_type
842 else:
843 return_value = _infer_return_value(node, func_context)
844 context.transient_locals[node.name] = return_value
845
846 return None
847
848 def dummy_function(*args, **kwargs):
849 pass
850
851 if return_type is not None:
852 if _is_type_annotation(return_type):
853 dummy_function.__annotations__["return"] = return_type
854 else:
855 dummy_function.__inferred_return__ = return_type
856 else:
857 inferred_return = _infer_return_value(node, func_context)
858 if inferred_return is not None:
859 dummy_function.__inferred_return__ = inferred_return
860
861 dummy_function.__name__ = node.name
862 dummy_function.__node__ = node
863 dummy_function.__is_async__ = is_async
864 context.transient_locals[node.name] = dummy_function
865 return None
866 if isinstance(node, ast.Lambda):
867
868 def dummy_function(*args, **kwargs):
869 pass
870
871 dummy_function.__inferred_return__ = eval_node(node.body, context)
872 return dummy_function
873 if isinstance(node, ast.ClassDef):
874 # TODO support class decorators?
875 class_locals = {}
876 outer_locals = context.locals.copy()
877 outer_locals.update(context.transient_locals)
878 class_context = context.replace(
879 transient_locals=class_locals, locals=outer_locals
880 )
881 class_context.class_transients = class_locals
882 for child_node in node.body:
883 eval_node(child_node, class_context)
884 bases = tuple([eval_node(base, context) for base in node.bases])
885 dummy_class = type(node.name, bases, class_locals)
886 context.transient_locals[node.name] = dummy_class
887 return None
888 if isinstance(node, ast.Await):
889 value = eval_node(node.value, context)
890 if hasattr(value, "__awaited_type__"):
891 return value.__awaited_type__
892 return value
893 if isinstance(node, ast.While):
894 loop_locals = context.transient_locals.copy()
895 loop_context = context.replace(transient_locals=loop_locals)
896
897 result = None
898 for stmt in node.body:
899 result = eval_node(stmt, loop_context)
900
901 policy = get_policy(context)
902 merged_locals = _merge_dicts_by_key(
903 [loop_locals, context.transient_locals.copy()], policy
904 )
905 context.transient_locals.update(merged_locals)
906
907 return result
908 if isinstance(node, ast.For):
909 try:
910 iterable = eval_node(node.iter, context)
911 except Exception:
912 iterable = None
913
914 sample = None
915 if iterable is not None:
916 try:
917 if policy.can_call(getattr(iterable, "__iter__", None)):
918 sample = next(iter(iterable))
919 except Exception:
920 sample = None
921
922 loop_locals = context.transient_locals.copy()
923 loop_context = context.replace(transient_locals=loop_locals)
924
925 if sample is not None:
926 try:
927 fake_assign = ast.Assign(
928 targets=[node.target], value=ast.Constant(value=sample)
929 )
930 _handle_assign(fake_assign, loop_context)
931 except Exception:
932 pass
933
934 result = None
935 for stmt in node.body:
936 result = eval_node(stmt, loop_context)
937
938 policy = get_policy(context)
939 merged_locals = _merge_dicts_by_key(
940 [loop_locals, context.transient_locals.copy()], policy
941 )
942 context.transient_locals.update(merged_locals)
943
944 return result
945 if isinstance(node, ast.If):
946 branches = []
947 current = node
948 result = None
949 while True:
950 branch_locals = context.transient_locals.copy()
951 branch_context = context.replace(transient_locals=branch_locals)
952 for stmt in current.body:
953 result = eval_node(stmt, branch_context)
954 branches.append(branch_locals)
955 if not current.orelse:
956 break
957 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If):
958 # It's an elif - continue loop
959 current = current.orelse[0]
960 else:
961 # It's an else block - process and break
962 else_locals = context.transient_locals.copy()
963 else_context = context.replace(transient_locals=else_locals)
964 for stmt in current.orelse:
965 result = eval_node(stmt, else_context)
966 branches.append(else_locals)
967 break
968 branches.append(context.transient_locals.copy())
969 policy = get_policy(context)
970 merged_locals = _merge_dicts_by_key(branches, policy)
971 context.transient_locals.update(merged_locals)
972 return result
973 if isinstance(node, ast.Assign):
974 return _handle_assign(node, context)
975 if isinstance(node, ast.AnnAssign):
976 return _handle_annassign(node, context)
977 if isinstance(node, ast.Expression):
978 return eval_node(node.body, context)
979 if isinstance(node, ast.Expr):
980 return eval_node(node.value, context)
981 if isinstance(node, ast.Pass):
982 return None
983 if isinstance(node, ast.Import):
984 # TODO: populate transient_locals
985 return None
986 if isinstance(node, (ast.AugAssign, ast.Delete)):
987 return None
988 if isinstance(node, (ast.Global, ast.Nonlocal)):
989 return None
990 if isinstance(node, ast.BinOp):
991 left = eval_node(node.left, context)
992 right = eval_node(node.right, context)
993 if (
994 isinstance(node.op, ast.BitOr)
995 and _is_type_annotation(left)
996 and _is_type_annotation(right)
997 ):
998 left_duck = (
999 _Duck(dict.fromkeys(dir(left)))
1000 if policy.can_call(left.__dir__)
1001 else _Duck()
1002 )
1003 right_duck = (
1004 _Duck(dict.fromkeys(dir(right)))
1005 if policy.can_call(right.__dir__)
1006 else _Duck()
1007 )
1008 value_node = context.current_value
1009 if value_node is not None and isinstance(value_node, ast.Dict):
1010 if dict in [left, right]:
1011 return _merge_values(
1012 [left_duck, right_duck, ast.literal_eval(value_node)],
1013 policy=get_policy(context),
1014 )
1015 return _merge_values([left_duck, right_duck], policy=get_policy(context))
1016 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS)
1017 if dunders:
1018 if policy.can_operate(dunders, left, right):
1019 return getattr(left, dunders[0])(right)
1020 else:
1021 raise GuardRejection(
1022 f"Operation (`{dunders}`) for",
1023 type(left),
1024 f"not allowed in {context.evaluation} mode",
1025 )
1026 if isinstance(node, ast.Compare):
1027 left = eval_node(node.left, context)
1028 all_true = True
1029 negate = False
1030 for op, right in zip(node.ops, node.comparators):
1031 right = eval_node(right, context)
1032 dunder = None
1033 dunders = _find_dunder(op, COMP_OP_DUNDERS)
1034 if not dunders:
1035 if isinstance(op, ast.NotIn):
1036 dunders = COMP_OP_DUNDERS[ast.In]
1037 negate = True
1038 if isinstance(op, ast.Is):
1039 dunder = "is_"
1040 if isinstance(op, ast.IsNot):
1041 dunder = "is_"
1042 negate = True
1043 if not dunder and dunders:
1044 dunder = dunders[0]
1045 if dunder:
1046 a, b = (right, left) if dunder == "__contains__" else (left, right)
1047 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b):
1048 result = getattr(operator, dunder)(a, b)
1049 if negate:
1050 result = not result
1051 if not result:
1052 all_true = False
1053 left = right
1054 else:
1055 raise GuardRejection(
1056 f"Comparison (`{dunder}`) for",
1057 type(left),
1058 f"not allowed in {context.evaluation} mode",
1059 )
1060 else:
1061 raise ValueError(
1062 f"Comparison `{dunder}` not supported"
1063 ) # pragma: no cover
1064 return all_true
1065 if isinstance(node, ast.Constant):
1066 return node.value
1067 if isinstance(node, ast.Tuple):
1068 return tuple(eval_node(e, context) for e in node.elts)
1069 if isinstance(node, ast.List):
1070 return [eval_node(e, context) for e in node.elts]
1071 if isinstance(node, ast.Set):
1072 return {eval_node(e, context) for e in node.elts}
1073 if isinstance(node, ast.Dict):
1074 return dict(
1075 zip(
1076 [eval_node(k, context) for k in node.keys],
1077 [eval_node(v, context) for v in node.values],
1078 )
1079 )
1080 if isinstance(node, ast.Slice):
1081 return slice(
1082 eval_node(node.lower, context),
1083 eval_node(node.upper, context),
1084 eval_node(node.step, context),
1085 )
1086 if isinstance(node, ast.UnaryOp):
1087 value = eval_node(node.operand, context)
1088 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS)
1089 if dunders:
1090 if policy.can_operate(dunders, value):
1091 try:
1092 return getattr(value, dunders[0])()
1093 except AttributeError:
1094 raise TypeError(
1095 f"bad operand type for unary {node.op}: {type(value)}"
1096 )
1097 else:
1098 raise GuardRejection(
1099 f"Operation (`{dunders}`) for",
1100 type(value),
1101 f"not allowed in {context.evaluation} mode",
1102 )
1103 if isinstance(node, ast.Subscript):
1104 value = eval_node(node.value, context)
1105 slice_ = eval_node(node.slice, context)
1106 if policy.can_get_item(value, slice_):
1107 return value[slice_]
1108 raise GuardRejection(
1109 "Subscript access (`__getitem__`) for",
1110 type(value), # not joined to avoid calling `repr`
1111 f" not allowed in {context.evaluation} mode",
1112 )
1113 if isinstance(node, ast.Name):
1114 return _eval_node_name(node.id, context)
1115 if isinstance(node, ast.Attribute):
1116 if (
1117 context.class_transients is not None
1118 and isinstance(node.value, ast.Name)
1119 and node.value.id == context.instance_arg_name
1120 ):
1121 return context.class_transients.get(node.attr)
1122 value = eval_node(node.value, context)
1123 if policy.can_get_attr(value, node.attr):
1124 return getattr(value, node.attr)
1125 try:
1126 cls = (
1127 value if isinstance(value, type) else getattr(value, "__class__", None)
1128 )
1129 if cls is not None:
1130 resolved_hints = get_type_hints(
1131 cls,
1132 globalns=(context.globals or {}),
1133 localns=(context.locals or {}),
1134 )
1135 if node.attr in resolved_hints:
1136 annotated = resolved_hints[node.attr]
1137 return _resolve_annotation(annotated, context)
1138 except Exception:
1139 # Fall through to the guard rejection
1140 pass
1141 raise GuardRejection(
1142 "Attribute access (`__getattr__`) for",
1143 type(value), # not joined to avoid calling `repr`
1144 f"not allowed in {context.evaluation} mode",
1145 )
1146 if isinstance(node, ast.IfExp):
1147 test = eval_node(node.test, context)
1148 if test:
1149 return eval_node(node.body, context)
1150 else:
1151 return eval_node(node.orelse, context)
1152 if isinstance(node, ast.Call):
1153 func = eval_node(node.func, context)
1154 if policy.can_call(func):
1155 args, kwargs = _extract_args_and_kwargs(node, context)
1156 return func(*args, **kwargs)
1157 if isclass(func):
1158 # this code path gets entered when calling class e.g. `MyClass()`
1159 # or `my_instance.__class__()` - in both cases `func` is `MyClass`.
1160 # Should return `MyClass` if `__new__` is not overridden,
1161 # otherwise whatever `__new__` return type is.
1162 overridden_return_type = _eval_return_type(func.__new__, node, context)
1163 if overridden_return_type is not NOT_EVALUATED:
1164 return overridden_return_type
1165 return _create_duck_for_heap_type(func)
1166 else:
1167 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED)
1168 return_type = _eval_return_type(func, node, context)
1169 if getattr(func, "__is_async__", False):
1170 awaited_type = (
1171 inferred_return if inferred_return is not None else return_type
1172 )
1173 coroutine_duck = _Duck(attributes=_get_coroutine_attributes())
1174 coroutine_duck.__awaited_type__ = awaited_type
1175 return coroutine_duck
1176 if inferred_return is not NOT_EVALUATED:
1177 return inferred_return
1178 if return_type is not NOT_EVALUATED:
1179 return return_type
1180 raise GuardRejection(
1181 "Call for",
1182 func, # not joined to avoid calling `repr`
1183 f"not allowed in {context.evaluation} mode",
1184 )
1185 if isinstance(node, ast.Assert):
1186 # message is always the second item, so if it is defined user would be completing
1187 # on the message, not on the assertion test
1188 if node.msg:
1189 return eval_node(node.msg, context)
1190 return eval_node(node.test, context)
1191 return None
1192
1193
1194def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy):
1195 """Merge multiple dictionaries, combining values for each key."""
1196 if len(dicts) == 1:
1197 return dicts[0]
1198
1199 all_keys = set()
1200 for d in dicts:
1201 all_keys.update(d.keys())
1202
1203 merged = {}
1204 for key in all_keys:
1205 values = [d[key] for d in dicts if key in d]
1206 if values:
1207 merged[key] = _merge_values(values, policy)
1208
1209 return merged
1210
1211
1212def _merge_values(values, policy: EvaluationPolicy):
1213 """Recursively merge multiple values, combining attributes and dict items."""
1214 if len(values) == 1:
1215 return values[0]
1216
1217 types = {type(v) for v in values}
1218 merged_items = None
1219 key_values = {}
1220 attributes = set()
1221 for v in values:
1222 if policy.can_call(v.__dir__):
1223 attributes.update(dir(v))
1224 try:
1225 if policy.can_call(v.items):
1226 try:
1227 for k, val in v.items():
1228 key_values.setdefault(k, []).append(val)
1229 except Exception as e:
1230 pass
1231 elif policy.can_call(v.keys):
1232 try:
1233 for k in v.keys():
1234 key_values.setdefault(k, []).append(None)
1235 except Exception as e:
1236 pass
1237 except Exception as e:
1238 pass
1239
1240 if key_values:
1241 merged_items = {
1242 k: _merge_values(vals, policy) if vals[0] is not None else None
1243 for k, vals in key_values.items()
1244 }
1245
1246 if len(types) == 1:
1247 t = next(iter(types))
1248 if t not in (dict,) and not (
1249 hasattr(next(iter(values)), "__getitem__")
1250 and (
1251 hasattr(next(iter(values)), "items")
1252 or hasattr(next(iter(values)), "keys")
1253 )
1254 ):
1255 if t in (list, set, tuple):
1256 return t
1257 return values[0]
1258
1259 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items)
1260
1261
1262def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext):
1263 """Infer the return value(s) of a function by evaluating all return statements."""
1264 return_values = _collect_return_values(node.body, context)
1265
1266 if not return_values:
1267 return None
1268 if len(return_values) == 1:
1269 return return_values[0]
1270
1271 policy = get_policy(context)
1272 return _merge_values(return_values, policy)
1273
1274
1275def _collect_return_values(body, context):
1276 """Recursively collect return values from a list of AST statements."""
1277 return_values = []
1278 for stmt in body:
1279 if isinstance(stmt, ast.Return):
1280 if stmt.value is None:
1281 continue
1282 try:
1283 value = eval_node(stmt.value, context)
1284 if value is not None and value is not NOT_EVALUATED:
1285 return_values.append(value)
1286 except Exception:
1287 pass
1288 if isinstance(
1289 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda)
1290 ):
1291 continue
1292 elif hasattr(stmt, "body") and isinstance(stmt.body, list):
1293 return_values.extend(_collect_return_values(stmt.body, context))
1294 if isinstance(stmt, ast.Try):
1295 for h in stmt.handlers:
1296 if hasattr(h, "body"):
1297 return_values.extend(_collect_return_values(h.body, context))
1298 if hasattr(stmt, "orelse"):
1299 return_values.extend(_collect_return_values(stmt.orelse, context))
1300 if hasattr(stmt, "finalbody"):
1301 return_values.extend(_collect_return_values(stmt.finalbody, context))
1302 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list):
1303 return_values.extend(_collect_return_values(stmt.orelse, context))
1304 return return_values
1305
1306
1307def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext):
1308 """Evaluate return type of a given callable function.
1309
1310 Returns the built-in type, a duck or NOT_EVALUATED sentinel.
1311 """
1312 try:
1313 sig = signature(func)
1314 except ValueError:
1315 sig = UNKNOWN_SIGNATURE
1316 # if annotation was not stringized, or it was stringized
1317 # but resolved by signature call we know the return type
1318 not_empty = sig.return_annotation is not Signature.empty
1319 if not_empty:
1320 return _resolve_annotation(sig.return_annotation, context, sig, func, node)
1321 return NOT_EVALUATED
1322
1323
1324def _eval_annotation(
1325 annotation: str,
1326 context: EvaluationContext,
1327):
1328 return (
1329 _eval_node_name(annotation, context)
1330 if isinstance(annotation, str)
1331 else annotation
1332 )
1333
1334
1335class _GetItemDuck(dict):
1336 """A dict subclass that always returns the factory instance and claims to have any item."""
1337
1338 def __init__(self, factory, *args, **kwargs):
1339 super().__init__(*args, **kwargs)
1340 self._factory = factory
1341
1342 def __getitem__(self, key):
1343 return self._factory()
1344
1345 def __contains__(self, key):
1346 return True
1347
1348
1349def _resolve_annotation(
1350 annotation: object | str,
1351 context: EvaluationContext,
1352 sig: Signature | None = None,
1353 func: Callable | None = None,
1354 node: ast.Call | None = None,
1355):
1356 """Resolve annotation created by user with `typing` module and custom objects."""
1357 if annotation is None:
1358 return None
1359 annotation = _eval_annotation(annotation, context)
1360 origin = get_origin(annotation)
1361 if annotation is Self and func and hasattr(func, "__self__"):
1362 return func.__self__
1363 elif origin is Literal:
1364 type_args = get_args(annotation)
1365 if len(type_args) == 1:
1366 return type_args[0]
1367 elif annotation is LiteralString:
1368 return ""
1369 elif annotation is AnyStr:
1370 index = None
1371 if func and hasattr(func, "__node__"):
1372 def_node = func.__node__
1373 for i, arg in enumerate(def_node.args.args):
1374 if not arg.annotation:
1375 continue
1376 annotation = _eval_annotation(arg.annotation.id, context)
1377 if annotation is AnyStr:
1378 index = i
1379 break
1380 is_bound_method = (
1381 isinstance(func, MethodType) and getattr(func, "__self__") is not None
1382 )
1383 if index and is_bound_method:
1384 index -= 1
1385 elif sig:
1386 for i, (key, value) in enumerate(sig.parameters.items()):
1387 if value.annotation is AnyStr:
1388 index = i
1389 break
1390 if index is None:
1391 return None
1392 if index < 0 or index >= len(node.args):
1393 return None
1394 return eval_node(node.args[index], context)
1395 elif origin is TypeGuard:
1396 return False
1397 elif origin is set or origin is list:
1398 # only one type argument allowed
1399 attributes = [
1400 attr
1401 for attr in dir(
1402 _resolve_annotation(get_args(annotation)[0], context, sig, func, node)
1403 )
1404 ]
1405 duck = _Duck(attributes=dict.fromkeys(attributes))
1406 return _Duck(
1407 attributes=dict.fromkeys(dir(origin())),
1408 # items are not strrictly needed for set
1409 items=_GetItemDuck(lambda: duck),
1410 )
1411 elif origin is tuple:
1412 # multiple type arguments
1413 return tuple(
1414 _resolve_annotation(arg, context, sig, func, node)
1415 for arg in get_args(annotation)
1416 )
1417 elif origin is Union:
1418 # multiple type arguments
1419 attributes = [
1420 attr
1421 for type_arg in get_args(annotation)
1422 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node))
1423 ]
1424 return _Duck(attributes=dict.fromkeys(attributes))
1425 elif is_typeddict(annotation):
1426 return _Duck(
1427 attributes=dict.fromkeys(dir(dict())),
1428 items={
1429 k: _resolve_annotation(v, context, sig, func, node)
1430 for k, v in annotation.__annotations__.items()
1431 },
1432 )
1433 elif hasattr(annotation, "_is_protocol"):
1434 return _Duck(attributes=dict.fromkeys(dir(annotation)))
1435 elif origin is Annotated:
1436 type_arg = get_args(annotation)[0]
1437 return _resolve_annotation(type_arg, context, sig, func, node)
1438 elif isinstance(annotation, NewType):
1439 return _eval_or_create_duck(annotation.__supertype__, context)
1440 elif isinstance(annotation, TypeAliasType):
1441 return _eval_or_create_duck(annotation.__value__, context)
1442 else:
1443 return _eval_or_create_duck(annotation, context)
1444
1445
1446def _eval_node_name(node_id: str, context: EvaluationContext):
1447 policy = get_policy(context)
1448 if node_id in context.transient_locals:
1449 return context.transient_locals[node_id]
1450 if policy.allow_locals_access and node_id in context.locals:
1451 return context.locals[node_id]
1452 if policy.allow_globals_access and node_id in context.globals:
1453 return context.globals[node_id]
1454 if policy.allow_builtins_access and hasattr(builtins, node_id):
1455 # note: do not use __builtins__, it is implementation detail of cPython
1456 return getattr(builtins, node_id)
1457 if policy.allow_auto_import and context.auto_import:
1458 return context.auto_import(node_id)
1459 if not policy.allow_globals_access and not policy.allow_locals_access:
1460 raise GuardRejection(
1461 f"Namespace access not allowed in {context.evaluation} mode"
1462 )
1463 else:
1464 raise NameError(f"{node_id} not found in locals, globals, nor builtins")
1465
1466
1467def _eval_or_create_duck(duck_type, context: EvaluationContext):
1468 policy = get_policy(context)
1469 # if allow-listed builtin is on type annotation, instantiate it
1470 if policy.can_call(duck_type):
1471 return duck_type()
1472 # if custom class is in type annotation, mock it
1473 return _create_duck_for_heap_type(duck_type)
1474
1475
1476def _create_duck_for_heap_type(duck_type):
1477 """Create an imitation of an object of a given type (a duck).
1478
1479 Returns the duck or NOT_EVALUATED sentinel if duck could not be created.
1480 """
1481 duck = ImpersonatingDuck()
1482 try:
1483 # this only works for heap types, not builtins
1484 duck.__class__ = duck_type
1485 return duck
1486 except TypeError:
1487 pass
1488 return NOT_EVALUATED
1489
1490
1491SUPPORTED_EXTERNAL_GETITEM = {
1492 ("pandas", "core", "indexing", "_iLocIndexer"),
1493 ("pandas", "core", "indexing", "_LocIndexer"),
1494 ("pandas", "DataFrame"),
1495 ("pandas", "Series"),
1496 ("numpy", "ndarray"),
1497 ("numpy", "void"),
1498}
1499
1500
1501BUILTIN_GETITEM: set[InstancesHaveGetItem] = {
1502 dict,
1503 str, # type: ignore[arg-type]
1504 bytes, # type: ignore[arg-type]
1505 list,
1506 tuple,
1507 type, # for type annotations like list[str]
1508 _Duck,
1509 collections.defaultdict,
1510 collections.deque,
1511 collections.OrderedDict,
1512 collections.ChainMap,
1513 collections.UserDict,
1514 collections.UserList,
1515 collections.UserString, # type: ignore[arg-type]
1516 _DummyNamedTuple,
1517 _IdentitySubscript,
1518}
1519
1520
1521def _list_methods(cls, source=None):
1522 """For use on immutable objects or with methods returning a copy"""
1523 return [getattr(cls, k) for k in (source if source else dir(cls))]
1524
1525
1526dict_non_mutating_methods = ("copy", "keys", "values", "items")
1527list_non_mutating_methods = ("copy", "index", "count")
1528set_non_mutating_methods = set(dir(set)) & set(dir(frozenset))
1529
1530
1531dict_keys: type[collections.abc.KeysView] = type({}.keys())
1532dict_values: type = type({}.values())
1533dict_items: type = type({}.items())
1534
1535NUMERICS = {int, float, complex}
1536
1537ALLOWED_CALLS = {
1538 bytes,
1539 *_list_methods(bytes),
1540 bytes.__iter__,
1541 dict,
1542 *_list_methods(dict, dict_non_mutating_methods),
1543 dict.__iter__,
1544 dict_keys.__iter__,
1545 dict_values.__iter__,
1546 dict_items.__iter__,
1547 dict_keys.isdisjoint,
1548 list,
1549 *_list_methods(list, list_non_mutating_methods),
1550 list.__iter__,
1551 set,
1552 *_list_methods(set, set_non_mutating_methods),
1553 set.__iter__,
1554 frozenset,
1555 *_list_methods(frozenset),
1556 frozenset.__iter__,
1557 range,
1558 range.__iter__,
1559 str,
1560 *_list_methods(str),
1561 str.__iter__,
1562 tuple,
1563 *_list_methods(tuple),
1564 tuple.__iter__,
1565 bool,
1566 *_list_methods(bool),
1567 *NUMERICS,
1568 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)],
1569 collections.deque,
1570 *_list_methods(collections.deque, list_non_mutating_methods),
1571 collections.deque.__iter__,
1572 collections.defaultdict,
1573 *_list_methods(collections.defaultdict, dict_non_mutating_methods),
1574 collections.defaultdict.__iter__,
1575 collections.OrderedDict,
1576 *_list_methods(collections.OrderedDict, dict_non_mutating_methods),
1577 collections.OrderedDict.__iter__,
1578 collections.UserDict,
1579 *_list_methods(collections.UserDict, dict_non_mutating_methods),
1580 collections.UserDict.__iter__,
1581 collections.UserList,
1582 *_list_methods(collections.UserList, list_non_mutating_methods),
1583 collections.UserList.__iter__,
1584 collections.UserString,
1585 *_list_methods(collections.UserString, dir(str)),
1586 collections.UserString.__iter__,
1587 collections.Counter,
1588 *_list_methods(collections.Counter, dict_non_mutating_methods),
1589 collections.Counter.__iter__,
1590 collections.Counter.elements,
1591 collections.Counter.most_common,
1592 object.__dir__,
1593 type.__dir__,
1594 _Duck.__dir__,
1595}
1596
1597BUILTIN_GETATTR: set[MayHaveGetattr] = {
1598 *BUILTIN_GETITEM,
1599 set,
1600 frozenset,
1601 object,
1602 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`.
1603 *NUMERICS,
1604 dict_keys,
1605 MethodDescriptorType,
1606 ModuleType,
1607}
1608
1609
1610BUILTIN_OPERATIONS = {*BUILTIN_GETATTR}
1611
1612EVALUATION_POLICIES = {
1613 "minimal": EvaluationPolicy(
1614 allow_builtins_access=True,
1615 allow_locals_access=False,
1616 allow_globals_access=False,
1617 allow_item_access=False,
1618 allow_attr_access=False,
1619 allowed_calls=set(),
1620 allow_any_calls=False,
1621 allow_all_operations=False,
1622 ),
1623 "limited": SelectivePolicy(
1624 allowed_getitem=BUILTIN_GETITEM,
1625 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM,
1626 allowed_getattr=BUILTIN_GETATTR,
1627 allowed_getattr_external={
1628 # pandas Series/Frame implements custom `__getattr__`
1629 ("pandas", "DataFrame"),
1630 ("pandas", "Series"),
1631 },
1632 allowed_operations=BUILTIN_OPERATIONS,
1633 allow_builtins_access=True,
1634 allow_locals_access=True,
1635 allow_globals_access=True,
1636 allow_getitem_on_types=True,
1637 allowed_calls=ALLOWED_CALLS,
1638 ),
1639 "unsafe": EvaluationPolicy(
1640 allow_builtins_access=True,
1641 allow_locals_access=True,
1642 allow_globals_access=True,
1643 allow_attr_access=True,
1644 allow_item_access=True,
1645 allow_any_calls=True,
1646 allow_all_operations=True,
1647 ),
1648}
1649
1650
1651__all__ = [
1652 "guarded_eval",
1653 "eval_node",
1654 "GuardRejection",
1655 "EvaluationContext",
1656 "_unbind_method",
1657]