1from copy import copy
2from inspect import isclass, signature, Signature, getmodule
3from typing import (
4 Annotated,
5 AnyStr,
6 Callable,
7 Literal,
8 NamedTuple,
9 NewType,
10 Optional,
11 Protocol,
12 Sequence,
13 TypeGuard,
14 Union,
15 get_args,
16 get_origin,
17 is_typeddict,
18)
19import ast
20import builtins
21import collections
22import dataclasses
23import operator
24import sys
25import typing
26import warnings
27from functools import cached_property
28from dataclasses import dataclass, field
29from types import MethodDescriptorType, ModuleType, MethodType
30
31from IPython.utils.decorators import undoc
32
33
34from typing import Self, LiteralString
35
36if sys.version_info < (3, 12):
37 from typing_extensions import TypeAliasType
38else:
39 from typing import TypeAliasType
40
41
42@undoc
43class HasGetItem(Protocol):
44 def __getitem__(self, key) -> None: ...
45
46
47@undoc
48class InstancesHaveGetItem(Protocol):
49 def __call__(self, *args, **kwargs) -> HasGetItem: ...
50
51
52@undoc
53class HasGetAttr(Protocol):
54 def __getattr__(self, key) -> None: ...
55
56
57@undoc
58class DoesNotHaveGetAttr(Protocol):
59 pass
60
61
62# By default `__getattr__` is not explicitly implemented on most objects
63MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr]
64
65
66def _unbind_method(func: Callable) -> Union[Callable, None]:
67 """Get unbound method for given bound method.
68
69 Returns None if cannot get unbound method, or method is already unbound.
70 """
71 owner = getattr(func, "__self__", None)
72 owner_class = type(owner)
73 name = getattr(func, "__name__", None)
74 instance_dict_overrides = getattr(owner, "__dict__", None)
75 if (
76 owner is not None
77 and name
78 and (
79 not instance_dict_overrides
80 or (instance_dict_overrides and name not in instance_dict_overrides)
81 )
82 ):
83 return getattr(owner_class, name)
84 return None
85
86
87@undoc
88@dataclass
89class EvaluationPolicy:
90 """Definition of evaluation policy."""
91
92 allow_locals_access: bool = False
93 allow_globals_access: bool = False
94 allow_item_access: bool = False
95 allow_attr_access: bool = False
96 allow_builtins_access: bool = False
97 allow_all_operations: bool = False
98 allow_any_calls: bool = False
99 allow_auto_import: bool = False
100 allowed_calls: set[Callable] = field(default_factory=set)
101
102 def can_get_item(self, value, item):
103 return self.allow_item_access
104
105 def can_get_attr(self, value, attr):
106 return self.allow_attr_access
107
108 def can_operate(self, dunders: tuple[str, ...], a, b=None):
109 if self.allow_all_operations:
110 return True
111
112 def can_call(self, func):
113 if self.allow_any_calls:
114 return True
115
116 if func in self.allowed_calls:
117 return True
118
119 owner_method = _unbind_method(func)
120
121 if owner_method and owner_method in self.allowed_calls:
122 return True
123
124
125def _get_external(module_name: str, access_path: Sequence[str]):
126 """Get value from external module given a dotted access path.
127
128 Only gets value if the module is already imported.
129
130 Raises:
131 * `KeyError` if module is removed not found, and
132 * `AttributeError` if access path does not match an exported object
133 """
134 try:
135 member_type = sys.modules[module_name]
136 # standard module
137 for attr in access_path:
138 member_type = getattr(member_type, attr)
139 return member_type
140 except (KeyError, AttributeError):
141 # handle modules in namespace packages
142 module_path = ".".join([module_name, *access_path])
143 if module_path in sys.modules:
144 return sys.modules[module_path]
145 raise
146
147
148def _has_original_dunder_external(
149 value,
150 module_name: str,
151 access_path: Sequence[str],
152 method_name: str,
153):
154 if module_name not in sys.modules:
155 full_module_path = ".".join([module_name, *access_path])
156 if full_module_path not in sys.modules:
157 # LBYLB as it is faster
158 return False
159 try:
160 member_type = _get_external(module_name, access_path)
161 value_type = type(value)
162 if type(value) == member_type:
163 return True
164 if isinstance(member_type, ModuleType):
165 value_module = getmodule(value_type)
166 if not value_module or not value_module.__name__:
167 return False
168 if (
169 value_module.__name__ == member_type.__name__
170 or value_module.__name__.startswith(member_type.__name__ + ".")
171 ):
172 return True
173 if method_name == "__getattribute__":
174 # we have to short-circuit here due to an unresolved issue in
175 # `isinstance` implementation: https://bugs.python.org/issue32683
176 return False
177 if not isinstance(member_type, ModuleType) and isinstance(value, member_type):
178 method = getattr(value_type, method_name, None)
179 member_method = getattr(member_type, method_name, None)
180 if member_method == method:
181 return True
182 if isinstance(member_type, ModuleType):
183 method = getattr(value_type, method_name, None)
184 for base_class in value_type.__mro__[1:]:
185 base_module = getmodule(base_class)
186 if base_module and (
187 base_module.__name__ == member_type.__name__
188 or base_module.__name__.startswith(member_type.__name__ + ".")
189 ):
190 # Check if the method comes from this trusted base class
191 base_method = getattr(base_class, method_name, None)
192 if base_method is not None and base_method == method:
193 return True
194 except (AttributeError, KeyError):
195 return False
196
197
198def _has_original_dunder(
199 value, allowed_types, allowed_methods, allowed_external, method_name
200):
201 # note: Python ignores `__getattr__`/`__getitem__` on instances,
202 # we only need to check at class level
203 value_type = type(value)
204
205 # strict type check passes → no need to check method
206 if value_type in allowed_types:
207 return True
208
209 method = getattr(value_type, method_name, None)
210
211 if method is None:
212 return None
213
214 if method in allowed_methods:
215 return True
216
217 for module_name, *access_path in allowed_external:
218 if _has_original_dunder_external(value, module_name, access_path, method_name):
219 return True
220
221 return False
222
223
224def _coerce_path_to_tuples(
225 allow_list: set[tuple[str, ...] | str],
226) -> set[tuple[str, ...]]:
227 """Replace dotted paths on the provided allow-list with tuples."""
228 return {
229 path if isinstance(path, tuple) else tuple(path.split("."))
230 for path in allow_list
231 }
232
233
234@undoc
235@dataclass
236class SelectivePolicy(EvaluationPolicy):
237 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set)
238 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set)
239
240 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set)
241 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set)
242
243 allowed_operations: set = field(default_factory=set)
244 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set)
245
246 allow_getitem_on_types: bool = field(default_factory=bool)
247
248 _operation_methods_cache: dict[str, set[Callable]] = field(
249 default_factory=dict, init=False
250 )
251
252 def can_get_attr(self, value, attr):
253 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external)
254
255 has_original_attribute = _has_original_dunder(
256 value,
257 allowed_types=self.allowed_getattr,
258 allowed_methods=self._getattribute_methods,
259 allowed_external=allowed_getattr_external,
260 method_name="__getattribute__",
261 )
262 has_original_attr = _has_original_dunder(
263 value,
264 allowed_types=self.allowed_getattr,
265 allowed_methods=self._getattr_methods,
266 allowed_external=allowed_getattr_external,
267 method_name="__getattr__",
268 )
269
270 accept = False
271
272 # Many objects do not have `__getattr__`, this is fine.
273 if has_original_attr is None and has_original_attribute:
274 accept = True
275 else:
276 # Accept objects without modifications to `__getattr__` and `__getattribute__`
277 accept = has_original_attr and has_original_attribute
278
279 if accept:
280 # We still need to check for overridden properties.
281
282 value_class = type(value)
283 if not hasattr(value_class, attr):
284 return True
285
286 class_attr_val = getattr(value_class, attr)
287 is_property = isinstance(class_attr_val, property)
288
289 if not is_property:
290 return True
291
292 # Properties in allowed types are ok (although we do not include any
293 # properties in our default allow list currently).
294 if type(value) in self.allowed_getattr:
295 return True # pragma: no cover
296
297 # Properties in subclasses of allowed types may be ok if not changed
298 for module_name, *access_path in allowed_getattr_external:
299 try:
300 external_class = _get_external(module_name, access_path)
301 external_class_attr_val = getattr(external_class, attr)
302 except (KeyError, AttributeError):
303 return False # pragma: no cover
304 return class_attr_val == external_class_attr_val
305
306 return False
307
308 def can_get_item(self, value, item):
309 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified."""
310 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external)
311 if self.allow_getitem_on_types:
312 # e.g. Union[str, int] or Literal[True, 1]
313 if isinstance(value, (typing._SpecialForm, typing._BaseGenericAlias)):
314 return True
315 # PEP 560 e.g. list[str]
316 if isinstance(value, type) and hasattr(value, "__class_getitem__"):
317 return True
318 return _has_original_dunder(
319 value,
320 allowed_types=self.allowed_getitem,
321 allowed_methods=self._getitem_methods,
322 allowed_external=allowed_getitem_external,
323 method_name="__getitem__",
324 )
325
326 def can_operate(self, dunders: tuple[str, ...], a, b=None):
327 allowed_operations_external = _coerce_path_to_tuples(
328 self.allowed_operations_external
329 )
330 objects = [a]
331 if b is not None:
332 objects.append(b)
333 return all(
334 [
335 _has_original_dunder(
336 obj,
337 allowed_types=self.allowed_operations,
338 allowed_methods=self._operator_dunder_methods(dunder),
339 allowed_external=allowed_operations_external,
340 method_name=dunder,
341 )
342 for dunder in dunders
343 for obj in objects
344 ]
345 )
346
347 def _operator_dunder_methods(self, dunder: str) -> set[Callable]:
348 if dunder not in self._operation_methods_cache:
349 self._operation_methods_cache[dunder] = self._safe_get_methods(
350 self.allowed_operations, dunder
351 )
352 return self._operation_methods_cache[dunder]
353
354 @cached_property
355 def _getitem_methods(self) -> set[Callable]:
356 return self._safe_get_methods(self.allowed_getitem, "__getitem__")
357
358 @cached_property
359 def _getattr_methods(self) -> set[Callable]:
360 return self._safe_get_methods(self.allowed_getattr, "__getattr__")
361
362 @cached_property
363 def _getattribute_methods(self) -> set[Callable]:
364 return self._safe_get_methods(self.allowed_getattr, "__getattribute__")
365
366 def _safe_get_methods(self, classes, name) -> set[Callable]:
367 return {
368 method
369 for class_ in classes
370 for method in [getattr(class_, name, None)]
371 if method
372 }
373
374
375class _DummyNamedTuple(NamedTuple):
376 """Used internally to retrieve methods of named tuple instance."""
377
378
379EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"]
380
381
382@dataclass
383class EvaluationContext:
384 #: Local namespace
385 locals: dict
386 #: Global namespace
387 globals: dict
388 #: Evaluation policy identifier
389 evaluation: EvaluationPolicyName = "forbidden"
390 #: Whether the evaluation of code takes place inside of a subscript.
391 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``.
392 in_subscript: bool = False
393 #: Auto import method
394 auto_import: Callable[list[str], ModuleType] | None = None
395 #: Overrides for evaluation policy
396 policy_overrides: dict = field(default_factory=dict)
397 #: Transient local namespace used to store mocks
398 transient_locals: dict = field(default_factory=dict)
399 #: Transients of class level
400 class_transients: dict | None = None
401 #: Instance variable name used in the method definition
402 instance_arg_name: str | None = None
403
404 def replace(self, /, **changes):
405 """Return a new copy of the context, with specified changes"""
406 return dataclasses.replace(self, **changes)
407
408
409class _IdentitySubscript:
410 """Returns the key itself when item is requested via subscript."""
411
412 def __getitem__(self, key):
413 return key
414
415
416IDENTITY_SUBSCRIPT = _IdentitySubscript()
417SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__"
418UNKNOWN_SIGNATURE = Signature()
419NOT_EVALUATED = object()
420
421
422class GuardRejection(Exception):
423 """Exception raised when guard rejects evaluation attempt."""
424
425 pass
426
427
428def guarded_eval(code: str, context: EvaluationContext):
429 """Evaluate provided code in the evaluation context.
430
431 If evaluation policy given by context is set to ``forbidden``
432 no evaluation will be performed; if it is set to ``dangerous``
433 standard :func:`eval` will be used; finally, for any other,
434 policy :func:`eval_node` will be called on parsed AST.
435 """
436 locals_ = context.locals
437
438 if context.evaluation == "forbidden":
439 raise GuardRejection("Forbidden mode")
440
441 # note: not using `ast.literal_eval` as it does not implement
442 # getitem at all, for example it fails on simple `[0][1]`
443
444 if context.in_subscript:
445 # syntactic sugar for ellipsis (:) is only available in subscripts
446 # so we need to trick the ast parser into thinking that we have
447 # a subscript, but we need to be able to later recognise that we did
448 # it so we can ignore the actual __getitem__ operation
449 if not code:
450 return tuple()
451 locals_ = locals_.copy()
452 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT
453 code = SUBSCRIPT_MARKER + "[" + code + "]"
454 context = context.replace(locals=locals_)
455
456 if context.evaluation == "dangerous":
457 return eval(code, context.globals, context.locals)
458
459 node = ast.parse(code, mode="exec")
460
461 return eval_node(node, context)
462
463
464BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = {
465 ast.Add: ("__add__",),
466 ast.Sub: ("__sub__",),
467 ast.Mult: ("__mul__",),
468 ast.Div: ("__truediv__",),
469 ast.FloorDiv: ("__floordiv__",),
470 ast.Mod: ("__mod__",),
471 ast.Pow: ("__pow__",),
472 ast.LShift: ("__lshift__",),
473 ast.RShift: ("__rshift__",),
474 ast.BitOr: ("__or__",),
475 ast.BitXor: ("__xor__",),
476 ast.BitAnd: ("__and__",),
477 ast.MatMult: ("__matmul__",),
478}
479
480COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = {
481 ast.Eq: ("__eq__",),
482 ast.NotEq: ("__ne__", "__eq__"),
483 ast.Lt: ("__lt__", "__gt__"),
484 ast.LtE: ("__le__", "__ge__"),
485 ast.Gt: ("__gt__", "__lt__"),
486 ast.GtE: ("__ge__", "__le__"),
487 ast.In: ("__contains__",),
488 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially
489}
490
491UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = {
492 ast.USub: ("__neg__",),
493 ast.UAdd: ("__pos__",),
494 # we have to check both __inv__ and __invert__!
495 ast.Invert: ("__invert__", "__inv__"),
496 ast.Not: ("__not__",),
497}
498
499
500class ImpersonatingDuck:
501 """A dummy class used to create objects of other classes without calling their ``__init__``"""
502
503 # no-op: override __class__ to impersonate
504
505
506class _Duck:
507 """A dummy class used to create objects pretending to have given attributes"""
508
509 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None):
510 self.attributes = attributes if attributes is not None else {}
511 self.items = items if items is not None else {}
512
513 def __getattr__(self, attr: str):
514 return self.attributes[attr]
515
516 def __hasattr__(self, attr: str):
517 return attr in self.attributes
518
519 def __dir__(self):
520 return [*dir(super), *self.attributes]
521
522 def __getitem__(self, key: str):
523 return self.items[key]
524
525 def __hasitem__(self, key: str):
526 return self.items[key]
527
528 def _ipython_key_completions_(self):
529 return self.items.keys()
530
531
532def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]:
533 dunder = None
534 for op, candidate_dunder in dunders.items():
535 if isinstance(node_op, op):
536 dunder = candidate_dunder
537 return dunder
538
539
540def get_policy(context: EvaluationContext) -> EvaluationPolicy:
541 policy = copy(EVALUATION_POLICIES[context.evaluation])
542
543 for key, value in context.policy_overrides.items():
544 if hasattr(policy, key):
545 setattr(policy, key, value)
546 return policy
547
548
549def _validate_policy_overrides(
550 policy_name: EvaluationPolicyName, policy_overrides: dict
551) -> bool:
552 policy = EVALUATION_POLICIES[policy_name]
553
554 all_good = True
555 for key, value in policy_overrides.items():
556 if not hasattr(policy, key):
557 warnings.warn(
558 f"Override {key!r} is not valid with {policy_name!r} evaluation policy"
559 )
560 all_good = False
561 return all_good
562
563
564def _handle_assign(node: ast.Assign, context: EvaluationContext):
565 value = eval_node(node.value, context)
566 transient_locals = context.transient_locals
567 policy = get_policy(context)
568 class_transients = context.class_transients
569 for target in node.targets:
570 if isinstance(target, (ast.Tuple, ast.List)):
571 # Handle unpacking assignment
572 values = list(value)
573 targets = target.elts
574 starred = [i for i, t in enumerate(targets) if isinstance(t, ast.Starred)]
575
576 # Unified handling: treat no starred as starred at end
577 star_or_last_idx = starred[0] if starred else len(targets)
578
579 # Before starred
580 for i in range(star_or_last_idx):
581 # Check for self.x assignment
582 if _is_instance_attribute_assignment(targets[i], context):
583 class_transients[targets[i].attr] = values[i]
584 else:
585 transient_locals[targets[i].id] = values[i]
586
587 # Starred if exists
588 if starred:
589 end = len(values) - (len(targets) - star_or_last_idx - 1)
590 if _is_instance_attribute_assignment(
591 targets[star_or_last_idx], context
592 ):
593 class_transients[targets[star_or_last_idx].attr] = values[
594 star_or_last_idx:end
595 ]
596 else:
597 transient_locals[targets[star_or_last_idx].value.id] = values[
598 star_or_last_idx:end
599 ]
600
601 # After starred
602 for i in range(star_or_last_idx + 1, len(targets)):
603 if _is_instance_attribute_assignment(targets[i], context):
604 class_transients[targets[i].attr] = values[
605 len(values) - (len(targets) - i)
606 ]
607 else:
608 transient_locals[targets[i].id] = values[
609 len(values) - (len(targets) - i)
610 ]
611 elif isinstance(target, ast.Subscript):
612 if isinstance(target.value, ast.Name):
613 name = target.value.id
614 container = transient_locals.get(name)
615 if container is None:
616 container = context.locals.get(name)
617 if container is None:
618 container = context.globals.get(name)
619 if container is None:
620 raise NameError(
621 f"{name} not found in locals, globals, nor builtins"
622 )
623 storage_dict = transient_locals
624 storage_key = name
625 elif isinstance(
626 target.value, ast.Attribute
627 ) and _is_instance_attribute_assignment(target.value, context):
628 attr = target.value.attr
629 container = class_transients.get(attr, None)
630 if container is None:
631 raise NameError(f"{attr} not found in class transients")
632 storage_dict = class_transients
633 storage_key = attr
634 else:
635 return
636
637 key = eval_node(target.slice, context)
638 attributes = (
639 dict.fromkeys(dir(container))
640 if policy.can_call(container.__dir__)
641 else {}
642 )
643 items = {}
644
645 if policy.can_get_item(container, None):
646 try:
647 items = dict(container.items())
648 except Exception:
649 pass
650
651 items[key] = value
652 duck_container = _Duck(attributes=attributes, items=items)
653 storage_dict[storage_key] = duck_container
654 elif _is_instance_attribute_assignment(target, context):
655 class_transients[target.attr] = value
656 else:
657 transient_locals[target.id] = value
658 return None
659
660
661def _extract_args_and_kwargs(node: ast.Call, context: EvaluationContext):
662 args = [eval_node(arg, context) for arg in node.args]
663 kwargs = {
664 k: v
665 for kw in node.keywords
666 for k, v in (
667 {kw.arg: eval_node(kw.value, context)}
668 if kw.arg
669 else eval_node(kw.value, context)
670 ).items()
671 }
672 return args, kwargs
673
674
675def _is_instance_attribute_assignment(
676 target: ast.AST, context: EvaluationContext
677) -> bool:
678 """Return True if target is an attribute access on the instance argument."""
679 return (
680 context.class_transients is not None
681 and context.instance_arg_name is not None
682 and isinstance(target, ast.Attribute)
683 and isinstance(getattr(target, "value", None), ast.Name)
684 and getattr(target.value, "id", None) == context.instance_arg_name
685 )
686
687
688def _get_coroutine_attributes() -> dict[str, Optional[object]]:
689 async def _dummy():
690 return None
691
692 coro = _dummy()
693 try:
694 return {attr: getattr(coro, attr, None) for attr in dir(coro)}
695 finally:
696 coro.close()
697
698
699def eval_node(node: Union[ast.AST, None], context: EvaluationContext):
700 """Evaluate AST node in provided context.
701
702 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments.
703
704 Does not evaluate actions that always have side effects:
705
706 - class definitions (``class sth: ...``)
707 - function definitions (``def sth: ...``)
708 - variable assignments (``x = 1``)
709 - augmented assignments (``x += 1``)
710 - deletions (``del x``)
711
712 Does not evaluate operations which do not return values:
713
714 - assertions (``assert x``)
715 - pass (``pass``)
716 - imports (``import x``)
717 - control flow:
718
719 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``)
720 - loops (``for`` and ``while``)
721 - exception handling
722
723 The purpose of this function is to guard against unwanted side-effects;
724 it does not give guarantees on protection from malicious code execution.
725 """
726 policy = get_policy(context)
727
728 if node is None:
729 return None
730 if isinstance(node, (ast.Interactive, ast.Module)):
731 result = None
732 for child_node in node.body:
733 result = eval_node(child_node, context)
734 return result
735 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
736 is_async = isinstance(node, ast.AsyncFunctionDef)
737 func_locals = context.transient_locals.copy()
738 func_context = context.replace(transient_locals=func_locals)
739 is_property = False
740 is_static = False
741 is_classmethod = False
742 for decorator_node in node.decorator_list:
743 try:
744 decorator = eval_node(decorator_node, context)
745 except NameError:
746 # if the decorator is not yet defined this is fine
747 # especialy because we don't handle imports yet
748 continue
749 if decorator is property:
750 is_property = True
751 elif decorator is staticmethod:
752 is_static = True
753 elif decorator is classmethod:
754 is_classmethod = True
755
756 if func_context.class_transients is not None:
757 if not is_static and not is_classmethod:
758 func_context.instance_arg_name = (
759 node.args.args[0].arg if node.args.args else None
760 )
761
762 return_type = eval_node(node.returns, context=context)
763
764 for child_node in node.body:
765 eval_node(child_node, func_context)
766
767 if is_property:
768 if return_type is not None:
769 context.transient_locals[node.name] = _resolve_annotation(
770 return_type, context
771 )
772 else:
773 return_value = _infer_return_value(node, func_context)
774 context.transient_locals[node.name] = return_value
775
776 return None
777
778 def dummy_function(*args, **kwargs):
779 pass
780
781 if return_type is not None:
782 dummy_function.__annotations__["return"] = return_type
783 else:
784 inferred_return = _infer_return_value(node, func_context)
785 if inferred_return is not None:
786 dummy_function.__inferred_return__ = inferred_return
787
788 dummy_function.__name__ = node.name
789 dummy_function.__node__ = node
790 dummy_function.__is_async__ = is_async
791 context.transient_locals[node.name] = dummy_function
792 return None
793 if isinstance(node, ast.Lambda):
794
795 def dummy_function(*args, **kwargs):
796 pass
797
798 dummy_function.__inferred_return__ = eval_node(node.body, context)
799 return dummy_function
800 if isinstance(node, ast.ClassDef):
801 # TODO support class decorators?
802 class_locals = {}
803 outer_locals = context.locals.copy()
804 outer_locals.update(context.transient_locals)
805 class_context = context.replace(
806 transient_locals=class_locals, locals=outer_locals
807 )
808 class_context.class_transients = class_locals
809 for child_node in node.body:
810 eval_node(child_node, class_context)
811 bases = tuple([eval_node(base, context) for base in node.bases])
812 dummy_class = type(node.name, bases, class_locals)
813 context.transient_locals[node.name] = dummy_class
814 return None
815 if isinstance(node, ast.Await):
816 value = eval_node(node.value, context)
817 if hasattr(value, "__awaited_type__"):
818 return value.__awaited_type__
819 return value
820 if isinstance(node, ast.While):
821 loop_locals = context.transient_locals.copy()
822 loop_context = context.replace(transient_locals=loop_locals)
823
824 result = None
825 for stmt in node.body:
826 result = eval_node(stmt, loop_context)
827
828 policy = get_policy(context)
829 merged_locals = _merge_dicts_by_key(
830 [loop_locals, context.transient_locals.copy()], policy
831 )
832 context.transient_locals.update(merged_locals)
833
834 return result
835 if isinstance(node, ast.For):
836 try:
837 iterable = eval_node(node.iter, context)
838 except Exception:
839 iterable = None
840
841 sample = None
842 if iterable is not None:
843 try:
844 if policy.can_call(getattr(iterable, "__iter__", None)):
845 sample = next(iter(iterable))
846 except Exception:
847 sample = None
848
849 loop_locals = context.transient_locals.copy()
850 loop_context = context.replace(transient_locals=loop_locals)
851
852 if sample is not None:
853 try:
854 fake_assign = ast.Assign(
855 targets=[node.target], value=ast.Constant(value=sample)
856 )
857 _handle_assign(fake_assign, loop_context)
858 except Exception:
859 pass
860
861 result = None
862 for stmt in node.body:
863 result = eval_node(stmt, loop_context)
864
865 policy = get_policy(context)
866 merged_locals = _merge_dicts_by_key(
867 [loop_locals, context.transient_locals.copy()], policy
868 )
869 context.transient_locals.update(merged_locals)
870
871 return result
872 if isinstance(node, ast.If):
873 branches = []
874 current = node
875 result = None
876 while True:
877 branch_locals = context.transient_locals.copy()
878 branch_context = context.replace(transient_locals=branch_locals)
879 for stmt in current.body:
880 result = eval_node(stmt, branch_context)
881 branches.append(branch_locals)
882 if not current.orelse:
883 break
884 elif len(current.orelse) == 1 and isinstance(current.orelse[0], ast.If):
885 # It's an elif - continue loop
886 current = current.orelse[0]
887 else:
888 # It's an else block - process and break
889 else_locals = context.transient_locals.copy()
890 else_context = context.replace(transient_locals=else_locals)
891 for stmt in current.orelse:
892 result = eval_node(stmt, else_context)
893 branches.append(else_locals)
894 break
895 branches.append(context.transient_locals.copy())
896 policy = get_policy(context)
897 merged_locals = _merge_dicts_by_key(branches, policy)
898 context.transient_locals.update(merged_locals)
899 return result
900 if isinstance(node, ast.Assign):
901 return _handle_assign(node, context)
902 if isinstance(node, ast.AnnAssign):
903 if node.simple:
904 value = _resolve_annotation(eval_node(node.annotation, context), context)
905 context.transient_locals[node.target.id] = value
906 # Handle non-simple annotated assignments only for self.x: type = value
907 if _is_instance_attribute_assignment(node.target, context):
908 value = _resolve_annotation(eval_node(node.annotation, context), context)
909 context.class_transients[node.target.attr] = value
910 return None
911 if isinstance(node, ast.Expression):
912 return eval_node(node.body, context)
913 if isinstance(node, ast.Expr):
914 return eval_node(node.value, context)
915 if isinstance(node, ast.Pass):
916 return None
917 if isinstance(node, ast.Import):
918 # TODO: populate transient_locals
919 return None
920 if isinstance(node, (ast.AugAssign, ast.Delete)):
921 return None
922 if isinstance(node, (ast.Global, ast.Nonlocal)):
923 return None
924 if isinstance(node, ast.BinOp):
925 left = eval_node(node.left, context)
926 right = eval_node(node.right, context)
927 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS)
928 if dunders:
929 if policy.can_operate(dunders, left, right):
930 return getattr(left, dunders[0])(right)
931 else:
932 raise GuardRejection(
933 f"Operation (`{dunders}`) for",
934 type(left),
935 f"not allowed in {context.evaluation} mode",
936 )
937 if isinstance(node, ast.Compare):
938 left = eval_node(node.left, context)
939 all_true = True
940 negate = False
941 for op, right in zip(node.ops, node.comparators):
942 right = eval_node(right, context)
943 dunder = None
944 dunders = _find_dunder(op, COMP_OP_DUNDERS)
945 if not dunders:
946 if isinstance(op, ast.NotIn):
947 dunders = COMP_OP_DUNDERS[ast.In]
948 negate = True
949 if isinstance(op, ast.Is):
950 dunder = "is_"
951 if isinstance(op, ast.IsNot):
952 dunder = "is_"
953 negate = True
954 if not dunder and dunders:
955 dunder = dunders[0]
956 if dunder:
957 a, b = (right, left) if dunder == "__contains__" else (left, right)
958 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b):
959 result = getattr(operator, dunder)(a, b)
960 if negate:
961 result = not result
962 if not result:
963 all_true = False
964 left = right
965 else:
966 raise GuardRejection(
967 f"Comparison (`{dunder}`) for",
968 type(left),
969 f"not allowed in {context.evaluation} mode",
970 )
971 else:
972 raise ValueError(
973 f"Comparison `{dunder}` not supported"
974 ) # pragma: no cover
975 return all_true
976 if isinstance(node, ast.Constant):
977 return node.value
978 if isinstance(node, ast.Tuple):
979 return tuple(eval_node(e, context) for e in node.elts)
980 if isinstance(node, ast.List):
981 return [eval_node(e, context) for e in node.elts]
982 if isinstance(node, ast.Set):
983 return {eval_node(e, context) for e in node.elts}
984 if isinstance(node, ast.Dict):
985 return dict(
986 zip(
987 [eval_node(k, context) for k in node.keys],
988 [eval_node(v, context) for v in node.values],
989 )
990 )
991 if isinstance(node, ast.Slice):
992 return slice(
993 eval_node(node.lower, context),
994 eval_node(node.upper, context),
995 eval_node(node.step, context),
996 )
997 if isinstance(node, ast.UnaryOp):
998 value = eval_node(node.operand, context)
999 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS)
1000 if dunders:
1001 if policy.can_operate(dunders, value):
1002 try:
1003 return getattr(value, dunders[0])()
1004 except AttributeError:
1005 raise TypeError(
1006 f"bad operand type for unary {node.op}: {type(value)}"
1007 )
1008 else:
1009 raise GuardRejection(
1010 f"Operation (`{dunders}`) for",
1011 type(value),
1012 f"not allowed in {context.evaluation} mode",
1013 )
1014 if isinstance(node, ast.Subscript):
1015 value = eval_node(node.value, context)
1016 slice_ = eval_node(node.slice, context)
1017 if policy.can_get_item(value, slice_):
1018 return value[slice_]
1019 raise GuardRejection(
1020 "Subscript access (`__getitem__`) for",
1021 type(value), # not joined to avoid calling `repr`
1022 f" not allowed in {context.evaluation} mode",
1023 )
1024 if isinstance(node, ast.Name):
1025 return _eval_node_name(node.id, context)
1026 if isinstance(node, ast.Attribute):
1027 if (
1028 context.class_transients is not None
1029 and isinstance(node.value, ast.Name)
1030 and node.value.id == context.instance_arg_name
1031 ):
1032 return context.class_transients.get(node.attr)
1033 value = eval_node(node.value, context)
1034 if policy.can_get_attr(value, node.attr):
1035 return getattr(value, node.attr)
1036 raise GuardRejection(
1037 "Attribute access (`__getattr__`) for",
1038 type(value), # not joined to avoid calling `repr`
1039 f"not allowed in {context.evaluation} mode",
1040 )
1041 if isinstance(node, ast.IfExp):
1042 test = eval_node(node.test, context)
1043 if test:
1044 return eval_node(node.body, context)
1045 else:
1046 return eval_node(node.orelse, context)
1047 if isinstance(node, ast.Call):
1048 func = eval_node(node.func, context)
1049 if policy.can_call(func):
1050 args, kwargs = _extract_args_and_kwargs(node, context)
1051 return func(*args, **kwargs)
1052 if isclass(func):
1053 # this code path gets entered when calling class e.g. `MyClass()`
1054 # or `my_instance.__class__()` - in both cases `func` is `MyClass`.
1055 # Should return `MyClass` if `__new__` is not overridden,
1056 # otherwise whatever `__new__` return type is.
1057 overridden_return_type = _eval_return_type(func.__new__, node, context)
1058 if overridden_return_type is not NOT_EVALUATED:
1059 return overridden_return_type
1060 return _create_duck_for_heap_type(func)
1061 else:
1062 inferred_return = getattr(func, "__inferred_return__", NOT_EVALUATED)
1063 return_type = _eval_return_type(func, node, context)
1064 if getattr(func, "__is_async__", False):
1065 awaited_type = (
1066 inferred_return if inferred_return is not None else return_type
1067 )
1068 coroutine_duck = _Duck(attributes=_get_coroutine_attributes())
1069 coroutine_duck.__awaited_type__ = awaited_type
1070 return coroutine_duck
1071 if inferred_return is not NOT_EVALUATED:
1072 return inferred_return
1073 if return_type is not NOT_EVALUATED:
1074 return return_type
1075 raise GuardRejection(
1076 "Call for",
1077 func, # not joined to avoid calling `repr`
1078 f"not allowed in {context.evaluation} mode",
1079 )
1080 if isinstance(node, ast.Assert):
1081 # message is always the second item, so if it is defined user would be completing
1082 # on the message, not on the assertion test
1083 if node.msg:
1084 return eval_node(node.msg, context)
1085 return eval_node(node.test, context)
1086 return None
1087
1088
1089def _merge_dicts_by_key(dicts: list, policy: EvaluationPolicy):
1090 """Merge multiple dictionaries, combining values for each key."""
1091 if len(dicts) == 1:
1092 return dicts[0]
1093
1094 all_keys = set()
1095 for d in dicts:
1096 all_keys.update(d.keys())
1097
1098 merged = {}
1099 for key in all_keys:
1100 values = [d[key] for d in dicts if key in d]
1101 if values:
1102 merged[key] = _merge_values(values, policy)
1103
1104 return merged
1105
1106
1107def _merge_values(values, policy: EvaluationPolicy):
1108 """Recursively merge multiple values, combining attributes and dict items."""
1109 if len(values) == 1:
1110 return values[0]
1111
1112 types = {type(v) for v in values}
1113 merged_items = None
1114 key_values = {}
1115 attributes = set()
1116 for v in values:
1117 if policy.can_call(v.__dir__):
1118 attributes.update(dir(v))
1119 try:
1120 if policy.can_call(v.items):
1121 try:
1122 for k, val in v.items():
1123 key_values.setdefault(k, []).append(val)
1124 except Exception as e:
1125 pass
1126 elif policy.can_call(v.keys):
1127 try:
1128 for k in v.keys():
1129 key_values.setdefault(k, []).append(None)
1130 except Exception as e:
1131 pass
1132 except Exception as e:
1133 pass
1134
1135 if key_values:
1136 merged_items = {
1137 k: _merge_values(vals, policy) if vals[0] is not None else None
1138 for k, vals in key_values.items()
1139 }
1140
1141 if len(types) == 1:
1142 t = next(iter(types))
1143 if t not in (dict,) and not (
1144 hasattr(next(iter(values)), "__getitem__")
1145 and (
1146 hasattr(next(iter(values)), "items")
1147 or hasattr(next(iter(values)), "keys")
1148 )
1149 ):
1150 if t in (list, set, tuple):
1151 return t
1152 return values[0]
1153
1154 return _Duck(attributes=dict.fromkeys(attributes), items=merged_items)
1155
1156
1157def _infer_return_value(node: ast.FunctionDef, context: EvaluationContext):
1158 """Infer the return value(s) of a function by evaluating all return statements."""
1159 return_values = _collect_return_values(node.body, context)
1160
1161 if not return_values:
1162 return None
1163 if len(return_values) == 1:
1164 return return_values[0]
1165
1166 policy = get_policy(context)
1167 return _merge_values(return_values, policy)
1168
1169
1170def _collect_return_values(body, context):
1171 """Recursively collect return values from a list of AST statements."""
1172 return_values = []
1173 for stmt in body:
1174 if isinstance(stmt, ast.Return):
1175 if stmt.value is None:
1176 continue
1177 try:
1178 value = eval_node(stmt.value, context)
1179 if value is not None and value is not NOT_EVALUATED:
1180 return_values.append(value)
1181 except Exception:
1182 pass
1183 if isinstance(
1184 stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef, ast.Lambda)
1185 ):
1186 continue
1187 elif hasattr(stmt, "body") and isinstance(stmt.body, list):
1188 return_values.extend(_collect_return_values(stmt.body, context))
1189 if isinstance(stmt, ast.Try):
1190 for h in stmt.handlers:
1191 if hasattr(h, "body"):
1192 return_values.extend(_collect_return_values(h.body, context))
1193 if hasattr(stmt, "orelse"):
1194 return_values.extend(_collect_return_values(stmt.orelse, context))
1195 if hasattr(stmt, "finalbody"):
1196 return_values.extend(_collect_return_values(stmt.finalbody, context))
1197 if hasattr(stmt, "orelse") and isinstance(stmt.orelse, list):
1198 return_values.extend(_collect_return_values(stmt.orelse, context))
1199 return return_values
1200
1201
1202def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext):
1203 """Evaluate return type of a given callable function.
1204
1205 Returns the built-in type, a duck or NOT_EVALUATED sentinel.
1206 """
1207 try:
1208 sig = signature(func)
1209 except ValueError:
1210 sig = UNKNOWN_SIGNATURE
1211 # if annotation was not stringized, or it was stringized
1212 # but resolved by signature call we know the return type
1213 not_empty = sig.return_annotation is not Signature.empty
1214 if not_empty:
1215 return _resolve_annotation(sig.return_annotation, context, sig, func, node)
1216 return NOT_EVALUATED
1217
1218
1219def _eval_annotation(
1220 annotation: str,
1221 context: EvaluationContext,
1222):
1223 return (
1224 _eval_node_name(annotation, context)
1225 if isinstance(annotation, str)
1226 else annotation
1227 )
1228
1229
1230class _GetItemDuck(dict):
1231 """A dict subclass that always returns the factory instance and claims to have any item."""
1232
1233 def __init__(self, factory, *args, **kwargs):
1234 super().__init__(*args, **kwargs)
1235 self._factory = factory
1236
1237 def __getitem__(self, key):
1238 return self._factory()
1239
1240 def __contains__(self, key):
1241 return True
1242
1243
1244def _resolve_annotation(
1245 annotation: object | str,
1246 context: EvaluationContext,
1247 sig: Signature | None = None,
1248 func: Callable | None = None,
1249 node: ast.Call | None = None,
1250):
1251 """Resolve annotation created by user with `typing` module and custom objects."""
1252 if annotation is None:
1253 return None
1254 annotation = _eval_annotation(annotation, context)
1255 origin = get_origin(annotation)
1256 if annotation is Self and func and hasattr(func, "__self__"):
1257 return func.__self__
1258 elif origin is Literal:
1259 type_args = get_args(annotation)
1260 if len(type_args) == 1:
1261 return type_args[0]
1262 elif annotation is LiteralString:
1263 return ""
1264 elif annotation is AnyStr:
1265 index = None
1266 if func and hasattr(func, "__node__"):
1267 def_node = func.__node__
1268 for i, arg in enumerate(def_node.args.args):
1269 if not arg.annotation:
1270 continue
1271 annotation = _eval_annotation(arg.annotation.id, context)
1272 if annotation is AnyStr:
1273 index = i
1274 break
1275 is_bound_method = (
1276 isinstance(func, MethodType) and getattr(func, "__self__") is not None
1277 )
1278 if index and is_bound_method:
1279 index -= 1
1280 elif sig:
1281 for i, (key, value) in enumerate(sig.parameters.items()):
1282 if value.annotation is AnyStr:
1283 index = i
1284 break
1285 if index is None:
1286 return None
1287 if index < 0 or index >= len(node.args):
1288 return None
1289 return eval_node(node.args[index], context)
1290 elif origin is TypeGuard:
1291 return False
1292 elif origin is set or origin is list:
1293 # only one type argument allowed
1294 attributes = [
1295 attr
1296 for attr in dir(
1297 _resolve_annotation(get_args(annotation)[0], context, sig, func, node)
1298 )
1299 ]
1300 duck = _Duck(attributes=dict.fromkeys(attributes))
1301 return _Duck(
1302 attributes=dict.fromkeys(dir(origin())),
1303 # items are not strrictly needed for set
1304 items=_GetItemDuck(lambda: duck),
1305 )
1306 elif origin is tuple:
1307 # multiple type arguments
1308 return tuple(
1309 _resolve_annotation(arg, context, sig, func, node)
1310 for arg in get_args(annotation)
1311 )
1312 elif origin is Union:
1313 # multiple type arguments
1314 attributes = [
1315 attr
1316 for type_arg in get_args(annotation)
1317 for attr in dir(_resolve_annotation(type_arg, context, sig, func, node))
1318 ]
1319 return _Duck(attributes=dict.fromkeys(attributes))
1320 elif is_typeddict(annotation):
1321 return _Duck(
1322 attributes=dict.fromkeys(dir(dict())),
1323 items={
1324 k: _resolve_annotation(v, context, sig, func, node)
1325 for k, v in annotation.__annotations__.items()
1326 },
1327 )
1328 elif hasattr(annotation, "_is_protocol"):
1329 return _Duck(attributes=dict.fromkeys(dir(annotation)))
1330 elif origin is Annotated:
1331 type_arg = get_args(annotation)[0]
1332 return _resolve_annotation(type_arg, context, sig, func, node)
1333 elif isinstance(annotation, NewType):
1334 return _eval_or_create_duck(annotation.__supertype__, context)
1335 elif isinstance(annotation, TypeAliasType):
1336 return _eval_or_create_duck(annotation.__value__, context)
1337 else:
1338 return _eval_or_create_duck(annotation, context)
1339
1340
1341def _eval_node_name(node_id: str, context: EvaluationContext):
1342 policy = get_policy(context)
1343 if node_id in context.transient_locals:
1344 return context.transient_locals[node_id]
1345 if policy.allow_locals_access and node_id in context.locals:
1346 return context.locals[node_id]
1347 if policy.allow_globals_access and node_id in context.globals:
1348 return context.globals[node_id]
1349 if policy.allow_builtins_access and hasattr(builtins, node_id):
1350 # note: do not use __builtins__, it is implementation detail of cPython
1351 return getattr(builtins, node_id)
1352 if policy.allow_auto_import and context.auto_import:
1353 return context.auto_import(node_id)
1354 if not policy.allow_globals_access and not policy.allow_locals_access:
1355 raise GuardRejection(
1356 f"Namespace access not allowed in {context.evaluation} mode"
1357 )
1358 else:
1359 raise NameError(f"{node_id} not found in locals, globals, nor builtins")
1360
1361
1362def _eval_or_create_duck(duck_type, context: EvaluationContext):
1363 policy = get_policy(context)
1364 # if allow-listed builtin is on type annotation, instantiate it
1365 if policy.can_call(duck_type):
1366 return duck_type()
1367 # if custom class is in type annotation, mock it
1368 return _create_duck_for_heap_type(duck_type)
1369
1370
1371def _create_duck_for_heap_type(duck_type):
1372 """Create an imitation of an object of a given type (a duck).
1373
1374 Returns the duck or NOT_EVALUATED sentinel if duck could not be created.
1375 """
1376 duck = ImpersonatingDuck()
1377 try:
1378 # this only works for heap types, not builtins
1379 duck.__class__ = duck_type
1380 return duck
1381 except TypeError:
1382 pass
1383 return NOT_EVALUATED
1384
1385
1386SUPPORTED_EXTERNAL_GETITEM = {
1387 ("pandas", "core", "indexing", "_iLocIndexer"),
1388 ("pandas", "core", "indexing", "_LocIndexer"),
1389 ("pandas", "DataFrame"),
1390 ("pandas", "Series"),
1391 ("numpy", "ndarray"),
1392 ("numpy", "void"),
1393}
1394
1395
1396BUILTIN_GETITEM: set[InstancesHaveGetItem] = {
1397 dict,
1398 str, # type: ignore[arg-type]
1399 bytes, # type: ignore[arg-type]
1400 list,
1401 tuple,
1402 type, # for type annotations like list[str]
1403 _Duck,
1404 collections.defaultdict,
1405 collections.deque,
1406 collections.OrderedDict,
1407 collections.ChainMap,
1408 collections.UserDict,
1409 collections.UserList,
1410 collections.UserString, # type: ignore[arg-type]
1411 _DummyNamedTuple,
1412 _IdentitySubscript,
1413}
1414
1415
1416def _list_methods(cls, source=None):
1417 """For use on immutable objects or with methods returning a copy"""
1418 return [getattr(cls, k) for k in (source if source else dir(cls))]
1419
1420
1421dict_non_mutating_methods = ("copy", "keys", "values", "items")
1422list_non_mutating_methods = ("copy", "index", "count")
1423set_non_mutating_methods = set(dir(set)) & set(dir(frozenset))
1424
1425
1426dict_keys: type[collections.abc.KeysView] = type({}.keys())
1427dict_values: type = type({}.values())
1428dict_items: type = type({}.items())
1429
1430NUMERICS = {int, float, complex}
1431
1432ALLOWED_CALLS = {
1433 bytes,
1434 *_list_methods(bytes),
1435 bytes.__iter__,
1436 dict,
1437 *_list_methods(dict, dict_non_mutating_methods),
1438 dict.__iter__,
1439 dict_keys.__iter__,
1440 dict_values.__iter__,
1441 dict_items.__iter__,
1442 dict_keys.isdisjoint,
1443 list,
1444 *_list_methods(list, list_non_mutating_methods),
1445 list.__iter__,
1446 set,
1447 *_list_methods(set, set_non_mutating_methods),
1448 set.__iter__,
1449 frozenset,
1450 *_list_methods(frozenset),
1451 frozenset.__iter__,
1452 range,
1453 range.__iter__,
1454 str,
1455 *_list_methods(str),
1456 str.__iter__,
1457 tuple,
1458 *_list_methods(tuple),
1459 tuple.__iter__,
1460 bool,
1461 *_list_methods(bool),
1462 *NUMERICS,
1463 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)],
1464 collections.deque,
1465 *_list_methods(collections.deque, list_non_mutating_methods),
1466 collections.deque.__iter__,
1467 collections.defaultdict,
1468 *_list_methods(collections.defaultdict, dict_non_mutating_methods),
1469 collections.defaultdict.__iter__,
1470 collections.OrderedDict,
1471 *_list_methods(collections.OrderedDict, dict_non_mutating_methods),
1472 collections.OrderedDict.__iter__,
1473 collections.UserDict,
1474 *_list_methods(collections.UserDict, dict_non_mutating_methods),
1475 collections.UserDict.__iter__,
1476 collections.UserList,
1477 *_list_methods(collections.UserList, list_non_mutating_methods),
1478 collections.UserList.__iter__,
1479 collections.UserString,
1480 *_list_methods(collections.UserString, dir(str)),
1481 collections.UserString.__iter__,
1482 collections.Counter,
1483 *_list_methods(collections.Counter, dict_non_mutating_methods),
1484 collections.Counter.__iter__,
1485 collections.Counter.elements,
1486 collections.Counter.most_common,
1487 object.__dir__,
1488 type.__dir__,
1489 _Duck.__dir__,
1490}
1491
1492BUILTIN_GETATTR: set[MayHaveGetattr] = {
1493 *BUILTIN_GETITEM,
1494 set,
1495 frozenset,
1496 object,
1497 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`.
1498 *NUMERICS,
1499 dict_keys,
1500 MethodDescriptorType,
1501 ModuleType,
1502}
1503
1504
1505BUILTIN_OPERATIONS = {*BUILTIN_GETATTR}
1506
1507EVALUATION_POLICIES = {
1508 "minimal": EvaluationPolicy(
1509 allow_builtins_access=True,
1510 allow_locals_access=False,
1511 allow_globals_access=False,
1512 allow_item_access=False,
1513 allow_attr_access=False,
1514 allowed_calls=set(),
1515 allow_any_calls=False,
1516 allow_all_operations=False,
1517 ),
1518 "limited": SelectivePolicy(
1519 allowed_getitem=BUILTIN_GETITEM,
1520 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM,
1521 allowed_getattr=BUILTIN_GETATTR,
1522 allowed_getattr_external={
1523 # pandas Series/Frame implements custom `__getattr__`
1524 ("pandas", "DataFrame"),
1525 ("pandas", "Series"),
1526 },
1527 allowed_operations=BUILTIN_OPERATIONS,
1528 allow_builtins_access=True,
1529 allow_locals_access=True,
1530 allow_globals_access=True,
1531 allow_getitem_on_types=True,
1532 allowed_calls=ALLOWED_CALLS,
1533 ),
1534 "unsafe": EvaluationPolicy(
1535 allow_builtins_access=True,
1536 allow_locals_access=True,
1537 allow_globals_access=True,
1538 allow_attr_access=True,
1539 allow_item_access=True,
1540 allow_any_calls=True,
1541 allow_all_operations=True,
1542 ),
1543}
1544
1545
1546__all__ = [
1547 "guarded_eval",
1548 "eval_node",
1549 "GuardRejection",
1550 "EvaluationContext",
1551 "_unbind_method",
1552]