1from copy import copy
2from inspect import isclass, signature, Signature, getmodule
3from typing import (
4 Annotated,
5 AnyStr,
6 Callable,
7 Literal,
8 NamedTuple,
9 NewType,
10 Optional,
11 Protocol,
12 Sequence,
13 TypeGuard,
14 Union,
15 get_args,
16 get_origin,
17 is_typeddict,
18)
19import ast
20import builtins
21import collections
22import dataclasses
23import operator
24import sys
25import typing
26import warnings
27from functools import cached_property
28from dataclasses import dataclass, field
29from types import MethodDescriptorType, ModuleType, MethodType
30
31from IPython.utils.decorators import undoc
32
33
34from typing import Self, LiteralString
35
36if sys.version_info < (3, 12):
37 from typing_extensions import TypeAliasType
38else:
39 from typing import TypeAliasType
40
41
42@undoc
43class HasGetItem(Protocol):
44 def __getitem__(self, key) -> None:
45 ...
46
47
48@undoc
49class InstancesHaveGetItem(Protocol):
50 def __call__(self, *args, **kwargs) -> HasGetItem:
51 ...
52
53
54@undoc
55class HasGetAttr(Protocol):
56 def __getattr__(self, key) -> None:
57 ...
58
59
60@undoc
61class DoesNotHaveGetAttr(Protocol):
62 pass
63
64
65# By default `__getattr__` is not explicitly implemented on most objects
66MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr]
67
68
69def _unbind_method(func: Callable) -> Union[Callable, None]:
70 """Get unbound method for given bound method.
71
72 Returns None if cannot get unbound method, or method is already unbound.
73 """
74 owner = getattr(func, "__self__", None)
75 owner_class = type(owner)
76 name = getattr(func, "__name__", None)
77 instance_dict_overrides = getattr(owner, "__dict__", None)
78 if (
79 owner is not None
80 and name
81 and (
82 not instance_dict_overrides
83 or (instance_dict_overrides and name not in instance_dict_overrides)
84 )
85 ):
86 return getattr(owner_class, name)
87 return None
88
89
90@undoc
91@dataclass
92class EvaluationPolicy:
93 """Definition of evaluation policy."""
94
95 allow_locals_access: bool = False
96 allow_globals_access: bool = False
97 allow_item_access: bool = False
98 allow_attr_access: bool = False
99 allow_builtins_access: bool = False
100 allow_all_operations: bool = False
101 allow_any_calls: bool = False
102 allow_auto_import: bool = False
103 allowed_calls: set[Callable] = field(default_factory=set)
104
105 def can_get_item(self, value, item):
106 return self.allow_item_access
107
108 def can_get_attr(self, value, attr):
109 return self.allow_attr_access
110
111 def can_operate(self, dunders: tuple[str, ...], a, b=None):
112 if self.allow_all_operations:
113 return True
114
115 def can_call(self, func):
116 if self.allow_any_calls:
117 return True
118
119 if func in self.allowed_calls:
120 return True
121
122 owner_method = _unbind_method(func)
123
124 if owner_method and owner_method in self.allowed_calls:
125 return True
126
127
128def _get_external(module_name: str, access_path: Sequence[str]):
129 """Get value from external module given a dotted access path.
130
131 Only gets value if the module is already imported.
132
133 Raises:
134 * `KeyError` if module is removed not found, and
135 * `AttributeError` if access path does not match an exported object
136 """
137 try:
138 member_type = sys.modules[module_name]
139 # standard module
140 for attr in access_path:
141 member_type = getattr(member_type, attr)
142 return member_type
143 except (KeyError, AttributeError):
144 # handle modules in namespace packages
145 module_path = ".".join([module_name, *access_path])
146 if module_path in sys.modules:
147 return sys.modules[module_path]
148 raise
149
150
151def _has_original_dunder_external(
152 value,
153 module_name: str,
154 access_path: Sequence[str],
155 method_name: str,
156):
157 if module_name not in sys.modules:
158 full_module_path = ".".join([module_name, *access_path])
159 if full_module_path not in sys.modules:
160 # LBYLB as it is faster
161 return False
162 try:
163 member_type = _get_external(module_name, access_path)
164 value_type = type(value)
165 if type(value) == member_type:
166 return True
167 if isinstance(member_type, ModuleType):
168 value_module = getmodule(value_type)
169 if not value_module or not value_module.__name__:
170 return False
171 if (
172 value_module.__name__ == member_type.__name__
173 or value_module.__name__.startswith(member_type.__name__ + ".")
174 ):
175 return True
176 if method_name == "__getattribute__":
177 # we have to short-circuit here due to an unresolved issue in
178 # `isinstance` implementation: https://bugs.python.org/issue32683
179 return False
180 if not isinstance(member_type, ModuleType) and isinstance(value, member_type):
181 method = getattr(value_type, method_name, None)
182 member_method = getattr(member_type, method_name, None)
183 if member_method == method:
184 return True
185 if isinstance(member_type, ModuleType):
186 method = getattr(value_type, method_name, None)
187 for base_class in value_type.__mro__[1:]:
188 base_module = getmodule(base_class)
189 if base_module and (
190 base_module.__name__ == member_type.__name__
191 or base_module.__name__.startswith(member_type.__name__ + ".")
192 ):
193 # Check if the method comes from this trusted base class
194 base_method = getattr(base_class, method_name, None)
195 if base_method is not None and base_method == method:
196 return True
197 except (AttributeError, KeyError):
198 return False
199
200
201def _has_original_dunder(
202 value, allowed_types, allowed_methods, allowed_external, method_name
203):
204 # note: Python ignores `__getattr__`/`__getitem__` on instances,
205 # we only need to check at class level
206 value_type = type(value)
207
208 # strict type check passes → no need to check method
209 if value_type in allowed_types:
210 return True
211
212 method = getattr(value_type, method_name, None)
213
214 if method is None:
215 return None
216
217 if method in allowed_methods:
218 return True
219
220 for module_name, *access_path in allowed_external:
221 if _has_original_dunder_external(value, module_name, access_path, method_name):
222 return True
223
224 return False
225
226
227def _coerce_path_to_tuples(
228 allow_list: set[tuple[str, ...] | str],
229) -> set[tuple[str, ...]]:
230 """Replace dotted paths on the provided allow-list with tuples."""
231 return {
232 path if isinstance(path, tuple) else tuple(path.split("."))
233 for path in allow_list
234 }
235
236
237@undoc
238@dataclass
239class SelectivePolicy(EvaluationPolicy):
240 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set)
241 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set)
242
243 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set)
244 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set)
245
246 allowed_operations: set = field(default_factory=set)
247 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set)
248
249 allow_getitem_on_types: bool = field(default_factory=bool)
250
251 _operation_methods_cache: dict[str, set[Callable]] = field(
252 default_factory=dict, init=False
253 )
254
255 def can_get_attr(self, value, attr):
256 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external)
257
258 has_original_attribute = _has_original_dunder(
259 value,
260 allowed_types=self.allowed_getattr,
261 allowed_methods=self._getattribute_methods,
262 allowed_external=allowed_getattr_external,
263 method_name="__getattribute__",
264 )
265 has_original_attr = _has_original_dunder(
266 value,
267 allowed_types=self.allowed_getattr,
268 allowed_methods=self._getattr_methods,
269 allowed_external=allowed_getattr_external,
270 method_name="__getattr__",
271 )
272
273 accept = False
274
275 # Many objects do not have `__getattr__`, this is fine.
276 if has_original_attr is None and has_original_attribute:
277 accept = True
278 else:
279 # Accept objects without modifications to `__getattr__` and `__getattribute__`
280 accept = has_original_attr and has_original_attribute
281
282 if accept:
283 # We still need to check for overridden properties.
284
285 value_class = type(value)
286 if not hasattr(value_class, attr):
287 return True
288
289 class_attr_val = getattr(value_class, attr)
290 is_property = isinstance(class_attr_val, property)
291
292 if not is_property:
293 return True
294
295 # Properties in allowed types are ok (although we do not include any
296 # properties in our default allow list currently).
297 if type(value) in self.allowed_getattr:
298 return True # pragma: no cover
299
300 # Properties in subclasses of allowed types may be ok if not changed
301 for module_name, *access_path in allowed_getattr_external:
302 try:
303 external_class = _get_external(module_name, access_path)
304 external_class_attr_val = getattr(external_class, attr)
305 except (KeyError, AttributeError):
306 return False # pragma: no cover
307 return class_attr_val == external_class_attr_val
308
309 return False
310
311 def can_get_item(self, value, item):
312 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified."""
313 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external)
314 if self.allow_getitem_on_types:
315 # e.g. Union[str, int] or Literal[True, 1]
316 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)):
317 return True
318 # PEP 560 e.g. list[str]
319 if isinstance(value, type) and hasattr(value, "__class_getitem__"):
320 return True
321 return _has_original_dunder(
322 value,
323 allowed_types=self.allowed_getitem,
324 allowed_methods=self._getitem_methods,
325 allowed_external=allowed_getitem_external,
326 method_name="__getitem__",
327 )
328
329 def can_operate(self, dunders: tuple[str, ...], a, b=None):
330 allowed_operations_external = _coerce_path_to_tuples(
331 self.allowed_operations_external
332 )
333 objects = [a]
334 if b is not None:
335 objects.append(b)
336 return all(
337 [
338 _has_original_dunder(
339 obj,
340 allowed_types=self.allowed_operations,
341 allowed_methods=self._operator_dunder_methods(dunder),
342 allowed_external=allowed_operations_external,
343 method_name=dunder,
344 )
345 for dunder in dunders
346 for obj in objects
347 ]
348 )
349
350 def _operator_dunder_methods(self, dunder: str) -> set[Callable]:
351 if dunder not in self._operation_methods_cache:
352 self._operation_methods_cache[dunder] = self._safe_get_methods(
353 self.allowed_operations, dunder
354 )
355 return self._operation_methods_cache[dunder]
356
357 @cached_property
358 def _getitem_methods(self) -> set[Callable]:
359 return self._safe_get_methods(self.allowed_getitem, "__getitem__")
360
361 @cached_property
362 def _getattr_methods(self) -> set[Callable]:
363 return self._safe_get_methods(self.allowed_getattr, "__getattr__")
364
365 @cached_property
366 def _getattribute_methods(self) -> set[Callable]:
367 return self._safe_get_methods(self.allowed_getattr, "__getattribute__")
368
369 def _safe_get_methods(self, classes, name) -> set[Callable]:
370 return {
371 method
372 for class_ in classes
373 for method in [getattr(class_, name, None)]
374 if method
375 }
376
377
378class _DummyNamedTuple(NamedTuple):
379 """Used internally to retrieve methods of named tuple instance."""
380
381
382EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"]
383
384
385@dataclass
386class EvaluationContext:
387 #: Local namespace
388 locals: dict
389 #: Global namespace
390 globals: dict
391 #: Evaluation policy identifier
392 evaluation: EvaluationPolicyName = "forbidden"
393 #: Whether the evaluation of code takes place inside of a subscript.
394 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``.
395 in_subscript: bool = False
396 #: Auto import method
397 auto_import: Callable[list[str], ModuleType] | None = None
398 #: Overrides for evaluation policy
399 policy_overrides: dict = field(default_factory=dict)
400 #: Transient local namespace used to store mocks
401 transient_locals: dict = field(default_factory=dict)
402 #: Transients of class level
403 class_transients: dict | None = None
404 #: Instance variable name used in the method definition
405 instance_arg_name: str | None = None
406
407 def replace(self, /, **changes):
408 """Return a new copy of the context, with specified changes"""
409 return dataclasses.replace(self, **changes)
410
411
412class _IdentitySubscript:
413 """Returns the key itself when item is requested via subscript."""
414
415 def __getitem__(self, key):
416 return key
417
418
419IDENTITY_SUBSCRIPT = _IdentitySubscript()
420SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__"
421UNKNOWN_SIGNATURE = Signature()
422NOT_EVALUATED = object()
423
424
425class GuardRejection(Exception):
426 """Exception raised when guard rejects evaluation attempt."""
427
428 pass
429
430
431def guarded_eval(code: str, context: EvaluationContext):
432 """Evaluate provided code in the evaluation context.
433
434 If evaluation policy given by context is set to ``forbidden``
435 no evaluation will be performed; if it is set to ``dangerous``
436 standard :func:`eval` will be used; finally, for any other,
437 policy :func:`eval_node` will be called on parsed AST.
438 """
439 locals_ = context.locals
440
441 if context.evaluation == "forbidden":
442 raise GuardRejection("Forbidden mode")
443
444 # note: not using `ast.literal_eval` as it does not implement
445 # getitem at all, for example it fails on simple `[0][1]`
446
447 if context.in_subscript:
448 # syntactic sugar for ellipsis (:) is only available in subscripts
449 # so we need to trick the ast parser into thinking that we have
450 # a subscript, but we need to be able to later recognise that we did
451 # it so we can ignore the actual __getitem__ operation
452 if not code:
453 return tuple()
454 locals_ = locals_.copy()
455 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT
456 code = SUBSCRIPT_MARKER + "[" + code + "]"
457 context = context.replace(locals=locals_)
458
459 if context.evaluation == "dangerous":
460 return eval(code, context.globals, context.locals)
461
462 node = ast.parse(code, mode="exec")
463
464 return eval_node(node, context)
465
466
467BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = {
468 ast.Add: ("__add__",),
469 ast.Sub: ("__sub__",),
470 ast.Mult: ("__mul__",),
471 ast.Div: ("__truediv__",),
472 ast.FloorDiv: ("__floordiv__",),
473 ast.Mod: ("__mod__",),
474 ast.Pow: ("__pow__",),
475 ast.LShift: ("__lshift__",),
476 ast.RShift: ("__rshift__",),
477 ast.BitOr: ("__or__",),
478 ast.BitXor: ("__xor__",),
479 ast.BitAnd: ("__and__",),
480 ast.MatMult: ("__matmul__",),
481}
482
483COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = {
484 ast.Eq: ("__eq__",),
485 ast.NotEq: ("__ne__", "__eq__"),
486 ast.Lt: ("__lt__", "__gt__"),
487 ast.LtE: ("__le__", "__ge__"),
488 ast.Gt: ("__gt__", "__lt__"),
489 ast.GtE: ("__ge__", "__le__"),
490 ast.In: ("__contains__",),
491 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially
492}
493
494UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = {
495 ast.USub: ("__neg__",),
496 ast.UAdd: ("__pos__",),
497 # we have to check both __inv__ and __invert__!
498 ast.Invert: ("__invert__", "__inv__"),
499 ast.Not: ("__not__",),
500}
501
502
503class ImpersonatingDuck:
504 """A dummy class used to create objects of other classes without calling their ``__init__``"""
505
506 # no-op: override __class__ to impersonate
507
508
509class _Duck:
510 """A dummy class used to create objects pretending to have given attributes"""
511
512 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None):
513 self.attributes = attributes if attributes is not None else {}
514 self.items = items if items is not None else {}
515
516 def __getattr__(self, attr: str):
517 return self.attributes[attr]
518
519 def __hasattr__(self, attr: str):
520 return attr in self.attributes
521
522 def __dir__(self):
523 return [*dir(super), *self.attributes]
524
525 def __getitem__(self, key: str):
526 return self.items[key]
527
528 def __hasitem__(self, key: str):
529 return self.items[key]
530
531 def _ipython_key_completions_(self):
532 return self.items.keys()
533
534
535def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]:
536 dunder = None
537 for op, candidate_dunder in dunders.items():
538 if isinstance(node_op, op):
539 dunder = candidate_dunder
540 return dunder
541
542
543def get_policy(context: EvaluationContext) -> EvaluationPolicy:
544 policy = copy(EVALUATION_POLICIES[context.evaluation])
545
546 for key, value in context.policy_overrides.items():
547 if hasattr(policy, key):
548 setattr(policy, key, value)
549 return policy
550
551
552def _validate_policy_overrides(
553 policy_name: EvaluationPolicyName, policy_overrides: dict
554) -> bool:
555 policy = EVALUATION_POLICIES[policy_name]
556
557 all_good = True
558 for key, value in policy_overrides.items():
559 if not hasattr(policy, key):
560 warnings.warn(
561 f"Override {key!r} is not valid with {policy_name!r} evaluation policy"
562 )
563 all_good = False
564 return all_good
565
566
567def _handle_assign(node: ast.Assign, context: EvaluationContext):
568 value = eval_node(node.value, context)
569 transient_locals = context.transient_locals
570 policy = get_policy(context)
571 class_transients = context.class_transients
572 for target in node.targets:
573 if isinstance(target, (ast.Tuple, ast.List)):
574 # Handle unpacking assignment
575 values = list(value)
576 targets = target.elts
577 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)]
578
579 # Unified handling: treat no starred as starred at end
580 star_or_last_idx = starred[0] if starred else len(targets)
581
582 # Before starred
583 for i in range(star_or_last_idx):
584 # Check for self.x assignment
585 if _is_instance_attribute_assignment(targets[i], context):
586 class_transients[targets[i].attr] = values[i]
587 else:
588 transient_locals[targets[i].id] = values[i]
589
590 # Starred if exists
591 if starred:
592 end = len(values) - (len(targets) - star_or_last_idx - 1)
593 if _is_instance_attribute_assignment(
594 targets[star_or_last_idx], context
595 ):
596 class_transients[targets[star_or_last_idx].attr] = values[
597 star_or_last_idx:end
598 ]
599 else:
600 transient_locals[targets[star_or_last_idx].value.id] = values[
601 star_or_last_idx:end
602 ]
603
604 # After starred
605 for i in range(star_or_last_idx + 1, len(targets)):
606 if _is_instance_attribute_assignment(targets[i], context):
607 class_transients[targets[i].attr] = values[
608 len(values) - (len(targets) - i)
609 ]
610 else:
611 transient_locals[targets[i].id] = values[
612 len(values) - (len(targets) - i)
613 ]
614 elif isinstance(target, ast.Subscript):
615 if isinstance(target.value, ast.Name):
616 name = target.value.id
617 container = transient_locals.get(name)
618 if container is None:
619 container = context.locals.get(name)
620 if container is None:
621 container = context.globals.get(name)
622 if container is None:
623 raise NameError(
624 f"{name} not found in locals, globals, nor builtins"
625 )
626 storage_dict = transient_locals
627 storage_key = name
628 elif isinstance(
629 target.value, ast.Attribute
630 ) and _is_instance_attribute_assignment(target.value, context):
631 attr = target.value.attr
632 container = class_transients.get(attr, None)
633 if container is None:
634 raise NameError(f"{attr} not found in class transients")
635 storage_dict = class_transients
636 storage_key = attr
637 else:
638 return
639
640 key = eval_node(target.slice, context)
641 attributes = (
642 dict.fromkeys(dir(container))
643 if policy.can_call(container.__dir__)
644 else {}
645 )
646 items = {}
647
648 if policy.can_get_item(container, None):
649 try:
650 items = dict(container.items())
651 except Exception:
652 pass
653
654 items[key] = value
655 duck_container = _Duck(attributes=attributes, items=items)
656 storage_dict[storage_key] = duck_container
657 elif _is_instance_attribute_assignment(target, context):
658 class_transients[target.attr] = value
659 else:
660 transient_locals[target.id] = value
661 return None
662
663
664def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext):
665 args = [eval_node(arg, context) for arg in node.args]
666 kwargs = {
667 k: v
668 for kw in node.keywords
669 for k, v in (
670 {kw.arg: eval_node(kw.value, context)}
671 if kw.arg
672 else eval_node(kw.value, context)
673 ).items()
674 }
675 return args, kwargs
676
677
678def _is_instance_attribute_assignment(
679 target: ast.AST, context: EvaluationContext
680) -> bool:
681 """Return True if target is an attribute access on the instance argument."""
682 return (
683 context.class_transients is not None
684 and context.instance_arg_name is not None
685 and isinstance(target, ast.Attribute)
686 and isinstance(getattr(target, "value", None), ast.Name)
687 and getattr(target.value, "id", None) == context.instance_arg_name
688 )
689
690
691def _get_coroutine_attributes() -> dict[str, Optional[object]]:
692 async def _dummy():
693 return None
694
695 coro = _dummy()
696 try:
697 return {attr: getattr(coro, attr, None) for attr in dir(coro)}
698 finally:
699 coro.close()
700
701
702def eval_node(node: Union[ast.AST, None], context: EvaluationContext):
703 """Evaluate AST node in provided context.
704
705 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments.
706
707 Does not evaluate actions that always have side effects:
708
709 - class definitions (``class sth: ...``)
710 - function definitions (``def sth: ...``)
711 - variable assignments (``x = 1``)
712 - augmented assignments (``x += 1``)
713 - deletions (``del x``)
714
715 Does not evaluate operations which do not return values:
716
717 - assertions (``assert x``)
718 - pass (``pass``)
719 - imports (``import x``)
720 - control flow:
721
722 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``)
723 - loops (``for`` and ``while``)
724 - exception handling
725
726 The purpose of this function is to guard against unwanted side-effects;
727 it does not give guarantees on protection from malicious code execution.
728 """
729 policy = get_policy(context)
730
731 if node is None:
732 return None
733 if isinstance(node, (ast.Interactive, ast.Module)):
734 result = None
735 for child_node in node.body:
736 result = eval_node(child_node, context)
737 return result
738 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
739 is_async = isinstance(node, ast.AsyncFunctionDef)
740 func_locals = context.transient_locals.copy()
741 func_context = context.replace(transient_locals=func_locals)
742 is_property = False
743 is_static = False
744 is_classmethod = False
745 for decorator_node in node.decorator_list:
746 try:
747 decorator = eval_node(decorator_node, context)
748 except NameError:
749 # if the decorator is not yet defined this is fine
750 # especialy because we don't handle imports yet
751 continue
752 if decorator is property:
753 is_property = True
754 elif decorator is staticmethod:
755 is_static = True
756 elif decorator is classmethod:
757 is_classmethod = True
758
759 if func_context.class_transients is not None:
760 if not is_static and not is_classmethod:
761 func_context.instance_arg_name = (
762 node.args.args[0].arg if node.args.args else None
763 )
764
765 return_type = eval_node(node.returns, context=context)
766
767 for child_node in node.body:
768 eval_node(child_node, func_context)
769
770 if is_property:
771 if return_type is not None:
772 context.transient_locals[node.name] = _resolve_annotation(
773 return_type, context
774 )
775 else:
776 return_value = _infer_return_value(node, func_context)
777 context.transient_locals[node.name] = return_value
778
779 return None
780
781 def dummy_function(*args, **kwargs):
782 pass
783
784 if return_type is not None:
785 dummy_function.__annotations__["return"] = return_type
786 else:
787 inferred_return = _infer_return_value(node, func_context)
788 if inferred_return is not None:
789 dummy_function.__inferred_return__ = inferred_return
790
791 dummy_function.__name__ = node.name
792 dummy_function.__node__ = node
793 dummy_function.__is_async__ = is_async
794 context.transient_locals[node.name] = dummy_function
795 return None
796 if isinstance(node, ast.Lambda):
797
798 def dummy_function(*args, **kwargs):
799 pass
800
801 dummy_function.__inferred_return__ = eval_node(node.body, context)
802 return dummy_function
803 if isinstance(node, ast.ClassDef):
804 # TODO support class decorators?
805 class_locals = {}
806 outer_locals = context.locals.copy()
807 outer_locals.update(context.transient_locals)
808 class_context = context.replace(
809 transient_locals=class_locals, locals=outer_locals
810 )
811 class_context.class_transients = class_locals
812 for child_node in node.body:
813 eval_node(child_node, class_context)
814 bases = tuple([eval_node(base, context) for base in node.bases])
815 dummy_class = type(node.name, bases, class_locals)
816 context.transient_locals[node.name] = dummy_class
817 return None
818 if isinstance(node, ast.Await):
819 value = eval_node(node.value, context)
820 if hasattr(value, "__awaited_type__"):
821 return value.__awaited_type__
822 return value
823 if isinstance(node, ast.While):
824 loop_locals = context.transient_locals.copy()
825 loop_context = context.replace(transient_locals=loop_locals)
826
827 result = None
828 for stmt in node.body:
829 result = eval_node(stmt, loop_context)
830
831 policy = get_policy(context)
832 merged_locals = _merge_dicts_by_key(
833 [loop_locals, context.transient_locals.copy()], policy
834 )
835 context.transient_locals.update(merged_locals)
836
837 return result
838 if isinstance(node, ast.For):
839 try:
840 iterable = eval_node(node.iter, context)
841 except Exception:
842 iterable = None
843
844 sample = None
845 if iterable is not None:
846 try:
847 if policy.can_call(getattr(iterable, "__iter__", None)):
848 sample = next(iter(iterable))
849 except Exception:
850 sample = None
851
852 loop_locals = context.transient_locals.copy()
853 loop_context = context.replace(transient_locals=loop_locals)
854
855 if sample is not None:
856 try:
857 fake_assign = ast.Assign(
858 targets=[node.target], value=ast.Constant(value=sample)
859 )
860 _handle_assign(fake_assign, loop_context)
861 except Exception:
862 pass
863
864 result = None
865 for stmt in node.body:
866 result = eval_node(stmt, loop_context)
867
868 policy = get_policy(context)
869 merged_locals = _merge_dicts_by_key(
870 [loop_locals, context.transient_locals.copy()], policy
871 )
872 context.transient_locals.update(merged_locals)
873
874 return result
875 if isinstance(node, ast.If):
876 branches = []
877 current = node
878 result = None
879 while True:
880 branch_locals = context.transient_locals.copy()
881 branch_context = context.replace(transient_locals=branch_locals)
882 for stmt in current.body:
883 result = eval_node(stmt, branch_context)
884 branches.append(branch_locals)
885 if not current.orelse:
886 break
887 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If):
888 # It's an elif - continue loop
889 current = current.orelse[0]
890 else:
891 # It's an else block - process and break
892 else_locals = context.transient_locals.copy()
893 else_context = context.replace(transient_locals=else_locals)
894 for stmt in current.orelse:
895 result = eval_node(stmt, else_context)
896 branches.append(else_locals)
897 break
898 branches.append(context.transient_locals.copy())
899 policy = get_policy(context)
900 merged_locals = _merge_dicts_by_key(branches, policy)
901 context.transient_locals.update(merged_locals)
902 return result
903 if isinstance(node, ast.Assign):
904 return _handle_assign(node, context)
905 if isinstance(node, ast.AnnAssign):
906 if node.simple:
907 value = _resolve_annotation(eval_node(node.annotation, context), context)
908 context.transient_locals[node.target.id] = value
909 # Handle non-simple annotated assignments only for self.x: type = value
910 if _is_instance_attribute_assignment(node.target, context):
911 value = _resolve_annotation(eval_node(node.annotation, context), context)
912 context.class_transients[node.target.attr] = value
913 return None
914 if isinstance(node, ast.Expression):
915 return eval_node(node.body, context)
916 if isinstance(node, ast.Expr):
917 return eval_node(node.value, context)
918 if isinstance(node, ast.Pass):
919 return None
920 if isinstance(node, ast.Import):
921 # TODO: populate transient_locals
922 return None
923 if isinstance(node, (ast.AugAssign, ast.Delete)):
924 return None
925 if isinstance(node, (ast.Global, ast.Nonlocal)):
926 return None
927 if isinstance(node, ast.BinOp):
928 left = eval_node(node.left, context)
929 right = eval_node(node.right, context)
930 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS)
931 if dunders:
932 if policy.can_operate(dunders, left, right):
933 return getattr(left, dunders[0])(right)
934 else:
935 raise GuardRejection(
936 f"Operation (`{dunders}`) for",
937 type(left),
938 f"not allowed in {context.evaluation} mode",
939 )
940 if isinstance(node, ast.Compare):
941 left = eval_node(node.left, context)
942 all_true = True
943 negate = False
944 for op, right in zip(node.ops, node.comparators):
945 right = eval_node(right, context)
946 dunder = None
947 dunders = _find_dunder(op, COMP_OP_DUNDERS)
948 if not dunders:
949 if isinstance(op, ast.NotIn):
950 dunders = COMP_OP_DUNDERS[ast.In]
951 negate = True
952 if isinstance(op, ast.Is):
953 dunder = "is_"
954 if isinstance(op, ast.IsNot):
955 dunder = "is_"
956 negate = True
957 if not dunder and dunders:
958 dunder = dunders[0]
959 if dunder:
960 a, b = (right, left) if dunder == "__contains__" else (left, right)
961 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b):
962 result = getattr(operator, dunder)(a, b)
963 if negate:
964 result = not result
965 if not result:
966 all_true = False
967 left = right
968 else:
969 raise GuardRejection(
970 f"Comparison (`{dunder}`) for",
971 type(left),
972 f"not allowed in {context.evaluation} mode",
973 )
974 else:
975 raise ValueError(
976 f"Comparison `{dunder}` not supported"
977 ) # pragma: no cover
978 return all_true
979 if isinstance(node, ast.Constant):
980 return node.value
981 if isinstance(node, ast.Tuple):
982 return tuple(eval_node(e, context) for e in node.elts)
983 if isinstance(node, ast.List):
984 return [eval_node(e, context) for e in node.elts]
985 if isinstance(node, ast.Set):
986 return {eval_node(e, context) for e in node.elts}
987 if isinstance(node, ast.Dict):
988 return dict(
989 zip(
990 [eval_node(k, context) for k in node.keys],
991 [eval_node(v, context) for v in node.values],
992 )
993 )
994 if isinstance(node, ast.Slice):
995 return slice(
996 eval_node(node.lower, context),
997 eval_node(node.upper, context),
998 eval_node(node.step, context),
999 )
1000 if isinstance(node, ast.UnaryOp):
1001 value = eval_node(node.operand, context)
1002 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS)
1003 if dunders:
1004 if policy.can_operate(dunders, value):
1005 try:
1006 return getattr(value, dunders[0])()
1007 except AttributeError:
1008 raise TypeError(
1009 f"bad operand type for unary {node.op}: {type(value)}"
1010 )
1011 else:
1012 raise GuardRejection(
1013 f"Operation (`{dunders}`) for",
1014 type(value),
1015 f"not allowed in {context.evaluation} mode",
1016 )
1017 if isinstance(node, ast.Subscript):
1018 value = eval_node(node.value, context)
1019 slice_ = eval_node(node.slice, context)
1020 if policy.can_get_item(value, slice_):
1021 return value[slice_]
1022 raise GuardRejection(
1023 "Subscript access (`__getitem__`) for",
1024 type(value), # not joined to avoid calling `repr`
1025 f" not allowed in {context.evaluation} mode",
1026 )
1027 if isinstance(node, ast.Name):
1028 return _eval_node_name(node.id, context)
1029 if isinstance(node, ast.Attribute):
1030 if (
1031 context.class_transients is not None
1032 and isinstance(node.value, ast.Name)
1033 and node.value.id == context.instance_arg_name
1034 ):
1035 return context.class_transients.get(node.attr)
1036 value = eval_node(node.value, context)
1037 if policy.can_get_attr(value, node.attr):
1038 return getattr(value, node.attr)
1039 raise GuardRejection(
1040 "Attribute access (`__getattr__`) for",
1041 type(value), # not joined to avoid calling `repr`
1042 f"not allowed in {context.evaluation} mode",
1043 )
1044 if isinstance(node, ast.IfExp):
1045 test = eval_node(node.test, context)
1046 if test:
1047 return eval_node(node.body, context)
1048 else:
1049 return eval_node(node.orelse, context)
1050 if isinstance(node, ast.Call):
1051 func = eval_node(node.func, context)
1052 if policy.can_call(func):
1053 args, kwargs = _extract_args_and_kwargs(node, context)
1054 return func(*args, **kwargs)
1055 if isclass(func):
1056 # this code path gets entered when calling class e.g. `MyClass()`
1057 # or `my_instance.__class__()` - in both cases `func` is `MyClass`.
1058 # Should return `MyClass` if `__new__` is not overridden,
1059 # otherwise whatever `__new__` return type is.
1060 overridden_return_type = _eval_return_type(func.__new__, node, context)
1061 if overridden_return_type is not NOT_EVALUATED:
1062 return overridden_return_type
1063 return _create_duck_for_heap_type(func)
1064 else:
1065 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED)
1066 return_type = _eval_return_type(func, node, context)
1067 if getattr(func, "__is_async__", False):
1068 awaited_type = (
1069 inferred_return if inferred_return is not None else return_type
1070 )
1071 coroutine_duck = _Duck(attributes=_get_coroutine_attributes())
1072 coroutine_duck.__awaited_type__ = awaited_type
1073 return coroutine_duck
1074 if inferred_return is not NOT_EVALUATED:
1075 return inferred_return
1076 if return_type is not NOT_EVALUATED:
1077 return return_type
1078 raise GuardRejection(
1079 "Call for",
1080 func, # not joined to avoid calling `repr`
1081 f"not allowed in {context.evaluation} mode",
1082 )
1083 if isinstance(node, ast.Assert):
1084 # message is always the second item, so if it is defined user would be completing
1085 # on the message, not on the assertion test
1086 if node.msg:
1087 return eval_node(node.msg, context)
1088 return eval_node(node.test, context)
1089 return None
1090
1091
1092def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy):
1093 """Merge multiple dictionaries, combining values for each key."""
1094 if len(dicts) == 1:
1095 return dicts[0]
1096
1097 all_keys = set()
1098 for d in dicts:
1099 all_keys.update(d.keys())
1100
1101 merged = {}
1102 for key in all_keys:
1103 values = [d[key] for d in dicts if key in d]
1104 if values:
1105 merged[key] = _merge_values(values, policy)
1106
1107 return merged
1108
1109
1110def _merge_values(values, policy: EvaluationPolicy):
1111 """Recursively merge multiple values, combining attributes and dict items."""
1112 if len(values) == 1:
1113 return values[0]
1114
1115 types = {type(v) for v in values}
1116 merged_items = None
1117 key_values = {}
1118 attributes = set()
1119 for v in values:
1120 if policy.can_call(v.__dir__):
1121 attributes.update(dir(v))
1122 try:
1123 if policy.can_call(v.items):
1124 try:
1125 for k, val in v.items():
1126 key_values.setdefault(k, []).append(val)
1127 except Exception as e:
1128 pass
1129 elif policy.can_call(v.keys):
1130 try:
1131 for k in v.keys():
1132 key_values.setdefault(k, []).append(None)
1133 except Exception as e:
1134 pass
1135 except Exception as e:
1136 pass
1137
1138 if key_values:
1139 merged_items = {
1140 k: _merge_values(vals, policy) if vals[0] is not None else None
1141 for k, vals in key_values.items()
1142 }
1143
1144 if len(types) == 1:
1145 t = next(iter(types))
1146 if t not in (dict,) and not (
1147 hasattr(next(iter(values)), "__getitem__")
1148 and (
1149 hasattr(next(iter(values)), "items")
1150 or hasattr(next(iter(values)), "keys")
1151 )
1152 ):
1153 if t in (list, set, tuple):
1154 return t
1155 return values[0]
1156
1157 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items)
1158
1159
1160def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext):
1161 """Infer the return value(s) of a function by evaluating all return statements."""
1162 return_values = _collect_return_values(node.body, context)
1163
1164 if not return_values:
1165 return None
1166 if len(return_values) == 1:
1167 return return_values[0]
1168
1169 policy = get_policy(context)
1170 return _merge_values(return_values, policy)
1171
1172
1173def _collect_return_values(body, context):
1174 """Recursively collect return values from a list of AST statements."""
1175 return_values = []
1176 for stmt in body:
1177 if isinstance(stmt, ast.Return):
1178 if stmt.value is None:
1179 continue
1180 try:
1181 value = eval_node(stmt.value, context)
1182 if value is not None and value is not NOT_EVALUATED:
1183 return_values.append(value)
1184 except Exception:
1185 pass
1186 if isinstance(
1187 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda)
1188 ):
1189 continue
1190 elif hasattr(stmt, "body") and isinstance(stmt.body, list):
1191 return_values.extend(_collect_return_values(stmt.body, context))
1192 if isinstance(stmt, ast.Try):
1193 for h in stmt.handlers:
1194 if hasattr(h, "body"):
1195 return_values.extend(_collect_return_values(h.body, context))
1196 if hasattr(stmt, "orelse"):
1197 return_values.extend(_collect_return_values(stmt.orelse, context))
1198 if hasattr(stmt, "finalbody"):
1199 return_values.extend(_collect_return_values(stmt.finalbody, context))
1200 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list):
1201 return_values.extend(_collect_return_values(stmt.orelse, context))
1202 return return_values
1203
1204
1205def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext):
1206 """Evaluate return type of a given callable function.
1207
1208 Returns the built-in type, a duck or NOT_EVALUATED sentinel.
1209 """
1210 try:
1211 sig = signature(func)
1212 except ValueError:
1213 sig = UNKNOWN_SIGNATURE
1214 # if annotation was not stringized, or it was stringized
1215 # but resolved by signature call we know the return type
1216 not_empty = sig.return_annotation is not Signature.empty
1217 if not_empty:
1218 return _resolve_annotation(sig.return_annotation, context, sig, func, node)
1219 return NOT_EVALUATED
1220
1221
1222def _eval_annotation(
1223 annotation: str,
1224 context: EvaluationContext,
1225):
1226 return (
1227 _eval_node_name(annotation, context)
1228 if isinstance(annotation, str)
1229 else annotation
1230 )
1231
1232
1233class _GetItemDuck(dict):
1234 """A dict subclass that always returns the factory instance and claims to have any item."""
1235
1236 def __init__(self, factory, *args, **kwargs):
1237 super().__init__(*args, **kwargs)
1238 self._factory = factory
1239
1240 def __getitem__(self, key):
1241 return self._factory()
1242
1243 def __contains__(self, key):
1244 return True
1245
1246
1247def _resolve_annotation(
1248 annotation: object | str,
1249 context: EvaluationContext,
1250 sig: Signature | None = None,
1251 func: Callable | None = None,
1252 node: ast.Call | None = None,
1253):
1254 """Resolve annotation created by user with `typing` module and custom objects."""
1255 if annotation is None:
1256 return None
1257 annotation = _eval_annotation(annotation, context)
1258 origin = get_origin(annotation)
1259 if annotation is Self and func and hasattr(func, "__self__"):
1260 return func.__self__
1261 elif origin is Literal:
1262 type_args = get_args(annotation)
1263 if len(type_args) == 1:
1264 return type_args[0]
1265 elif annotation is LiteralString:
1266 return ""
1267 elif annotation is AnyStr:
1268 index = None
1269 if func and hasattr(func, "__node__"):
1270 def_node = func.__node__
1271 for i, arg in enumerate(def_node.args.args):
1272 if not arg.annotation:
1273 continue
1274 annotation = _eval_annotation(arg.annotation.id, context)
1275 if annotation is AnyStr:
1276 index = i
1277 break
1278 is_bound_method = (
1279 isinstance(func, MethodType) and getattr(func, "__self__") is not None
1280 )
1281 if index and is_bound_method:
1282 index -= 1
1283 elif sig:
1284 for i, (key, value) in enumerate(sig.parameters.items()):
1285 if value.annotation is AnyStr:
1286 index = i
1287 break
1288 if index is None:
1289 return None
1290 if index < 0 or index >= len(node.args):
1291 return None
1292 return eval_node(node.args[index], context)
1293 elif origin is TypeGuard:
1294 return False
1295 elif origin is set or origin is list:
1296 # only one type argument allowed
1297 attributes = [
1298 attr
1299 for attr in dir(
1300 _resolve_annotation(get_args(annotation)[0], context, sig, func, node)
1301 )
1302 ]
1303 duck = _Duck(attributes=dict.fromkeys(attributes))
1304 return _Duck(
1305 attributes=dict.fromkeys(dir(origin())),
1306 # items are not strrictly needed for set
1307 items=_GetItemDuck(lambda: duck),
1308 )
1309 elif origin is tuple:
1310 # multiple type arguments
1311 return tuple(
1312 _resolve_annotation(arg, context, sig, func, node)
1313 for arg in get_args(annotation)
1314 )
1315 elif origin is Union:
1316 # multiple type arguments
1317 attributes = [
1318 attr
1319 for type_arg in get_args(annotation)
1320 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node))
1321 ]
1322 return _Duck(attributes=dict.fromkeys(attributes))
1323 elif is_typeddict(annotation):
1324 return _Duck(
1325 attributes=dict.fromkeys(dir(dict())),
1326 items={
1327 k: _resolve_annotation(v, context, sig, func, node)
1328 for k, v in annotation.__annotations__.items()
1329 },
1330 )
1331 elif hasattr(annotation, "_is_protocol"):
1332 return _Duck(attributes=dict.fromkeys(dir(annotation)))
1333 elif origin is Annotated:
1334 type_arg = get_args(annotation)[0]
1335 return _resolve_annotation(type_arg, context, sig, func, node)
1336 elif isinstance(annotation, NewType):
1337 return _eval_or_create_duck(annotation.__supertype__, context)
1338 elif isinstance(annotation, TypeAliasType):
1339 return _eval_or_create_duck(annotation.__value__, context)
1340 else:
1341 return _eval_or_create_duck(annotation, context)
1342
1343
1344def _eval_node_name(node_id: str, context: EvaluationContext):
1345 policy = get_policy(context)
1346 if node_id in context.transient_locals:
1347 return context.transient_locals[node_id]
1348 if policy.allow_locals_access and node_id in context.locals:
1349 return context.locals[node_id]
1350 if policy.allow_globals_access and node_id in context.globals:
1351 return context.globals[node_id]
1352 if policy.allow_builtins_access and hasattr(builtins, node_id):
1353 # note: do not use __builtins__, it is implementation detail of cPython
1354 return getattr(builtins, node_id)
1355 if policy.allow_auto_import and context.auto_import:
1356 return context.auto_import(node_id)
1357 if not policy.allow_globals_access and not policy.allow_locals_access:
1358 raise GuardRejection(
1359 f"Namespace access not allowed in {context.evaluation} mode"
1360 )
1361 else:
1362 raise NameError(f"{node_id} not found in locals, globals, nor builtins")
1363
1364
1365def _eval_or_create_duck(duck_type, context: EvaluationContext):
1366 policy = get_policy(context)
1367 # if allow-listed builtin is on type annotation, instantiate it
1368 if policy.can_call(duck_type):
1369 return duck_type()
1370 # if custom class is in type annotation, mock it
1371 return _create_duck_for_heap_type(duck_type)
1372
1373
1374def _create_duck_for_heap_type(duck_type):
1375 """Create an imitation of an object of a given type (a duck).
1376
1377 Returns the duck or NOT_EVALUATED sentinel if duck could not be created.
1378 """
1379 duck = ImpersonatingDuck()
1380 try:
1381 # this only works for heap types, not builtins
1382 duck.__class__ = duck_type
1383 return duck
1384 except TypeError:
1385 pass
1386 return NOT_EVALUATED
1387
1388
1389SUPPORTED_EXTERNAL_GETITEM = {
1390 ("pandas", "core", "indexing", "_iLocIndexer"),
1391 ("pandas", "core", "indexing", "_LocIndexer"),
1392 ("pandas", "DataFrame"),
1393 ("pandas", "Series"),
1394 ("numpy", "ndarray"),
1395 ("numpy", "void"),
1396}
1397
1398
1399BUILTIN_GETITEM: set[InstancesHaveGetItem] = {
1400 dict,
1401 str, # type: ignore[arg-type]
1402 bytes, # type: ignore[arg-type]
1403 list,
1404 tuple,
1405 type, # for type annotations like list[str]
1406 _Duck,
1407 collections.defaultdict,
1408 collections.deque,
1409 collections.OrderedDict,
1410 collections.ChainMap,
1411 collections.UserDict,
1412 collections.UserList,
1413 collections.UserString, # type: ignore[arg-type]
1414 _DummyNamedTuple,
1415 _IdentitySubscript,
1416}
1417
1418
1419def _list_methods(cls, source=None):
1420 """For use on immutable objects or with methods returning a copy"""
1421 return [getattr(cls, k) for k in (source if source else dir(cls))]
1422
1423
1424dict_non_mutating_methods = ("copy", "keys", "values", "items")
1425list_non_mutating_methods = ("copy", "index", "count")
1426set_non_mutating_methods = set(dir(set)) & set(dir(frozenset))
1427
1428
1429dict_keys: type[collections.abc.KeysView] = type({}.keys())
1430dict_values: type = type({}.values())
1431dict_items: type = type({}.items())
1432
1433NUMERICS = {int, float, complex}
1434
1435ALLOWED_CALLS = {
1436 bytes,
1437 *_list_methods(bytes),
1438 bytes.__iter__,
1439 dict,
1440 *_list_methods(dict, dict_non_mutating_methods),
1441 dict.__iter__,
1442 dict_keys.__iter__,
1443 dict_values.__iter__,
1444 dict_items.__iter__,
1445 dict_keys.isdisjoint,
1446 list,
1447 *_list_methods(list, list_non_mutating_methods),
1448 list.__iter__,
1449 set,
1450 *_list_methods(set, set_non_mutating_methods),
1451 set.__iter__,
1452 frozenset,
1453 *_list_methods(frozenset),
1454 frozenset.__iter__,
1455 range,
1456 range.__iter__,
1457 str,
1458 *_list_methods(str),
1459 str.__iter__,
1460 tuple,
1461 *_list_methods(tuple),
1462 tuple.__iter__,
1463 bool,
1464 *_list_methods(bool),
1465 *NUMERICS,
1466 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)],
1467 collections.deque,
1468 *_list_methods(collections.deque, list_non_mutating_methods),
1469 collections.deque.__iter__,
1470 collections.defaultdict,
1471 *_list_methods(collections.defaultdict, dict_non_mutating_methods),
1472 collections.defaultdict.__iter__,
1473 collections.OrderedDict,
1474 *_list_methods(collections.OrderedDict, dict_non_mutating_methods),
1475 collections.OrderedDict.__iter__,
1476 collections.UserDict,
1477 *_list_methods(collections.UserDict, dict_non_mutating_methods),
1478 collections.UserDict.__iter__,
1479 collections.UserList,
1480 *_list_methods(collections.UserList, list_non_mutating_methods),
1481 collections.UserList.__iter__,
1482 collections.UserString,
1483 *_list_methods(collections.UserString, dir(str)),
1484 collections.UserString.__iter__,
1485 collections.Counter,
1486 *_list_methods(collections.Counter, dict_non_mutating_methods),
1487 collections.Counter.__iter__,
1488 collections.Counter.elements,
1489 collections.Counter.most_common,
1490 object.__dir__,
1491 type.__dir__,
1492 _Duck.__dir__,
1493}
1494
1495BUILTIN_GETATTR: set[MayHaveGetattr] = {
1496 *BUILTIN_GETITEM,
1497 set,
1498 frozenset,
1499 object,
1500 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`.
1501 *NUMERICS,
1502 dict_keys,
1503 MethodDescriptorType,
1504 ModuleType,
1505}
1506
1507
1508BUILTIN_OPERATIONS = {*BUILTIN_GETATTR}
1509
1510EVALUATION_POLICIES = {
1511 "minimal": EvaluationPolicy(
1512 allow_builtins_access=True,
1513 allow_locals_access=False,
1514 allow_globals_access=False,
1515 allow_item_access=False,
1516 allow_attr_access=False,
1517 allowed_calls=set(),
1518 allow_any_calls=False,
1519 allow_all_operations=False,
1520 ),
1521 "limited": SelectivePolicy(
1522 allowed_getitem=BUILTIN_GETITEM,
1523 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM,
1524 allowed_getattr=BUILTIN_GETATTR,
1525 allowed_getattr_external={
1526 # pandas Series/Frame implements custom `__getattr__`
1527 ("pandas", "DataFrame"),
1528 ("pandas", "Series"),
1529 },
1530 allowed_operations=BUILTIN_OPERATIONS,
1531 allow_builtins_access=True,
1532 allow_locals_access=True,
1533 allow_globals_access=True,
1534 allow_getitem_on_types=True,
1535 allowed_calls=ALLOWED_CALLS,
1536 ),
1537 "unsafe": EvaluationPolicy(
1538 allow_builtins_access=True,
1539 allow_locals_access=True,
1540 allow_globals_access=True,
1541 allow_attr_access=True,
1542 allow_item_access=True,
1543 allow_any_calls=True,
1544 allow_all_operations=True,
1545 ),
1546}
1547
1548
1549__all__ = [
1550 "guarded_eval",
1551 "eval_node",
1552 "GuardRejection",
1553 "EvaluationContext",
1554 "_unbind_method",
1555]