1from copy import copy
2from inspect import isclass, signature, Signature, getmodule
3from typing import (
4 Annotated,
5 AnyStr,
6 Callable,
7 Literal,
8 NamedTuple,
9 NewType,
10 Optional,
11 Protocol,
12 Sequence,
13 TypeGuard,
14 Union,
15 get_args,
16 get_origin,
17 is_typeddict,
18)
19import ast
20import builtins
21import collections
22import operator
23import sys
24import warnings
25from functools import cached_property
26from dataclasses import dataclass, field
27from types import MethodDescriptorType, ModuleType
28
29from IPython.utils.decorators import undoc
30
31
32from typing import Self, LiteralString
33
34if sys.version_info < (3, 12):
35 from typing_extensions import TypeAliasType
36else:
37 from typing import TypeAliasType
38
39
40@undoc
41class HasGetItem(Protocol):
42 def __getitem__(self, key) -> None: ...
43
44
45@undoc
46class InstancesHaveGetItem(Protocol):
47 def __call__(self, *args, **kwargs) -> HasGetItem: ...
48
49
50@undoc
51class HasGetAttr(Protocol):
52 def __getattr__(self, key) -> None: ...
53
54
55@undoc
56class DoesNotHaveGetAttr(Protocol):
57 pass
58
59
60# By default `__getattr__` is not explicitly implemented on most objects
61MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr]
62
63
64def _unbind_method(func: Callable) -> Union[Callable, None]:
65 """Get unbound method for given bound method.
66
67 Returns None if cannot get unbound method, or method is already unbound.
68 """
69 owner = getattr(func, "__self__", None)
70 owner_class = type(owner)
71 name = getattr(func, "__name__", None)
72 instance_dict_overrides = getattr(owner, "__dict__", None)
73 if (
74 owner is not None
75 and name
76 and (
77 not instance_dict_overrides
78 or (instance_dict_overrides and name not in instance_dict_overrides)
79 )
80 ):
81 return getattr(owner_class, name)
82 return None
83
84
85@undoc
86@dataclass
87class EvaluationPolicy:
88 """Definition of evaluation policy."""
89
90 allow_locals_access: bool = False
91 allow_globals_access: bool = False
92 allow_item_access: bool = False
93 allow_attr_access: bool = False
94 allow_builtins_access: bool = False
95 allow_all_operations: bool = False
96 allow_any_calls: bool = False
97 allow_auto_import: bool = False
98 allowed_calls: set[Callable] = field(default_factory=set)
99
100 def can_get_item(self, value, item):
101 return self.allow_item_access
102
103 def can_get_attr(self, value, attr):
104 return self.allow_attr_access
105
106 def can_operate(self, dunders: tuple[str, ...], a, b=None):
107 if self.allow_all_operations:
108 return True
109
110 def can_call(self, func):
111 if self.allow_any_calls:
112 return True
113
114 if func in self.allowed_calls:
115 return True
116
117 owner_method = _unbind_method(func)
118
119 if owner_method and owner_method in self.allowed_calls:
120 return True
121
122
123def _get_external(module_name: str, access_path: Sequence[str]):
124 """Get value from external module given a dotted access path.
125
126 Only gets value if the module is already imported.
127
128 Raises:
129 * `KeyError` if module is removed not found, and
130 * `AttributeError` if access path does not match an exported object
131 """
132 try:
133 member_type = sys.modules[module_name]
134 # standard module
135 for attr in access_path:
136 member_type = getattr(member_type, attr)
137 return member_type
138 except (KeyError, AttributeError):
139 # handle modules in namespace packages
140 module_path = ".".join([module_name, *access_path])
141 if module_path in sys.modules:
142 return sys.modules[module_path]
143 raise
144
145
146def _has_original_dunder_external(
147 value,
148 module_name: str,
149 access_path: Sequence[str],
150 method_name: str,
151):
152 if module_name not in sys.modules:
153 full_module_path = ".".join([module_name, *access_path])
154 if full_module_path not in sys.modules:
155 # LBYLB as it is faster
156 return False
157 try:
158 member_type = _get_external(module_name, access_path)
159 value_type = type(value)
160 if type(value) == member_type:
161 return True
162 if isinstance(member_type, ModuleType):
163 value_module = getmodule(value_type)
164 if not value_module or not value_module.__name__:
165 return False
166 if value_module.__name__.startswith(member_type.__name__):
167 return True
168 if method_name == "__getattribute__":
169 # we have to short-circuit here due to an unresolved issue in
170 # `isinstance` implementation: https://bugs.python.org/issue32683
171 return False
172 if not isinstance(member_type, ModuleType) and isinstance(value, member_type):
173 method = getattr(value_type, method_name, None)
174 member_method = getattr(member_type, method_name, None)
175 if member_method == method:
176 return True
177 except (AttributeError, KeyError):
178 return False
179
180
181def _has_original_dunder(
182 value, allowed_types, allowed_methods, allowed_external, method_name
183):
184 # note: Python ignores `__getattr__`/`__getitem__` on instances,
185 # we only need to check at class level
186 value_type = type(value)
187
188 # strict type check passes → no need to check method
189 if value_type in allowed_types:
190 return True
191
192 method = getattr(value_type, method_name, None)
193
194 if method is None:
195 return None
196
197 if method in allowed_methods:
198 return True
199
200 for module_name, *access_path in allowed_external:
201 if _has_original_dunder_external(value, module_name, access_path, method_name):
202 return True
203
204 return False
205
206
207def _coerce_path_to_tuples(
208 allow_list: set[tuple[str, ...] | str],
209) -> set[tuple[str, ...]]:
210 """Replace dotted paths on the provided allow-list with tuples."""
211 return {
212 path if isinstance(path, tuple) else tuple(path.split("."))
213 for path in allow_list
214 }
215
216
217@undoc
218@dataclass
219class SelectivePolicy(EvaluationPolicy):
220 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set)
221 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set)
222
223 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set)
224 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set)
225
226 allowed_operations: set = field(default_factory=set)
227 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set)
228
229 _operation_methods_cache: dict[str, set[Callable]] = field(
230 default_factory=dict, init=False
231 )
232
233 def can_get_attr(self, value, attr):
234 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external)
235
236 has_original_attribute = _has_original_dunder(
237 value,
238 allowed_types=self.allowed_getattr,
239 allowed_methods=self._getattribute_methods,
240 allowed_external=allowed_getattr_external,
241 method_name="__getattribute__",
242 )
243 has_original_attr = _has_original_dunder(
244 value,
245 allowed_types=self.allowed_getattr,
246 allowed_methods=self._getattr_methods,
247 allowed_external=allowed_getattr_external,
248 method_name="__getattr__",
249 )
250
251 accept = False
252
253 # Many objects do not have `__getattr__`, this is fine.
254 if has_original_attr is None and has_original_attribute:
255 accept = True
256 else:
257 # Accept objects without modifications to `__getattr__` and `__getattribute__`
258 accept = has_original_attr and has_original_attribute
259
260 if accept:
261 # We still need to check for overridden properties.
262
263 value_class = type(value)
264 if not hasattr(value_class, attr):
265 return True
266
267 class_attr_val = getattr(value_class, attr)
268 is_property = isinstance(class_attr_val, property)
269
270 if not is_property:
271 return True
272
273 # Properties in allowed types are ok (although we do not include any
274 # properties in our default allow list currently).
275 if type(value) in self.allowed_getattr:
276 return True # pragma: no cover
277
278 # Properties in subclasses of allowed types may be ok if not changed
279 for module_name, *access_path in allowed_getattr_external:
280 try:
281 external_class = _get_external(module_name, access_path)
282 external_class_attr_val = getattr(external_class, attr)
283 except (KeyError, AttributeError):
284 return False # pragma: no cover
285 return class_attr_val == external_class_attr_val
286
287 return False
288
289 def can_get_item(self, value, item):
290 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified."""
291 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external)
292 return _has_original_dunder(
293 value,
294 allowed_types=self.allowed_getitem,
295 allowed_methods=self._getitem_methods,
296 allowed_external=allowed_getitem_external,
297 method_name="__getitem__",
298 )
299
300 def can_operate(self, dunders: tuple[str, ...], a, b=None):
301 allowed_operations_external = _coerce_path_to_tuples(
302 self.allowed_operations_external
303 )
304 objects = [a]
305 if b is not None:
306 objects.append(b)
307 return all(
308 [
309 _has_original_dunder(
310 obj,
311 allowed_types=self.allowed_operations,
312 allowed_methods=self._operator_dunder_methods(dunder),
313 allowed_external=allowed_operations_external,
314 method_name=dunder,
315 )
316 for dunder in dunders
317 for obj in objects
318 ]
319 )
320
321 def _operator_dunder_methods(self, dunder: str) -> set[Callable]:
322 if dunder not in self._operation_methods_cache:
323 self._operation_methods_cache[dunder] = self._safe_get_methods(
324 self.allowed_operations, dunder
325 )
326 return self._operation_methods_cache[dunder]
327
328 @cached_property
329 def _getitem_methods(self) -> set[Callable]:
330 return self._safe_get_methods(self.allowed_getitem, "__getitem__")
331
332 @cached_property
333 def _getattr_methods(self) -> set[Callable]:
334 return self._safe_get_methods(self.allowed_getattr, "__getattr__")
335
336 @cached_property
337 def _getattribute_methods(self) -> set[Callable]:
338 return self._safe_get_methods(self.allowed_getattr, "__getattribute__")
339
340 def _safe_get_methods(self, classes, name) -> set[Callable]:
341 return {
342 method
343 for class_ in classes
344 for method in [getattr(class_, name, None)]
345 if method
346 }
347
348
349class _DummyNamedTuple(NamedTuple):
350 """Used internally to retrieve methods of named tuple instance."""
351
352
353EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"]
354
355
356class EvaluationContext(NamedTuple):
357 #: Local namespace
358 locals: dict
359 #: Global namespace
360 globals: dict
361 #: Evaluation policy identifier
362 evaluation: EvaluationPolicyName = "forbidden"
363 #: Whether the evaluation of code takes place inside of a subscript.
364 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``.
365 in_subscript: bool = False
366 #: Auto import method
367 auto_import: Callable[list[str], ModuleType] | None = None
368 #: Overrides for evaluation policy
369 policy_overrides: dict = {}
370
371
372class _IdentitySubscript:
373 """Returns the key itself when item is requested via subscript."""
374
375 def __getitem__(self, key):
376 return key
377
378
379IDENTITY_SUBSCRIPT = _IdentitySubscript()
380SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__"
381UNKNOWN_SIGNATURE = Signature()
382NOT_EVALUATED = object()
383
384
385class GuardRejection(Exception):
386 """Exception raised when guard rejects evaluation attempt."""
387
388 pass
389
390
391def guarded_eval(code: str, context: EvaluationContext):
392 """Evaluate provided code in the evaluation context.
393
394 If evaluation policy given by context is set to ``forbidden``
395 no evaluation will be performed; if it is set to ``dangerous``
396 standard :func:`eval` will be used; finally, for any other,
397 policy :func:`eval_node` will be called on parsed AST.
398 """
399 locals_ = context.locals
400
401 if context.evaluation == "forbidden":
402 raise GuardRejection("Forbidden mode")
403
404 # note: not using `ast.literal_eval` as it does not implement
405 # getitem at all, for example it fails on simple `[0][1]`
406
407 if context.in_subscript:
408 # syntactic sugar for ellipsis (:) is only available in subscripts
409 # so we need to trick the ast parser into thinking that we have
410 # a subscript, but we need to be able to later recognise that we did
411 # it so we can ignore the actual __getitem__ operation
412 if not code:
413 return tuple()
414 locals_ = locals_.copy()
415 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT
416 code = SUBSCRIPT_MARKER + "[" + code + "]"
417 context = EvaluationContext(**{**context._asdict(), **{"locals": locals_}})
418
419 if context.evaluation == "dangerous":
420 return eval(code, context.globals, context.locals)
421
422 expression = ast.parse(code, mode="eval")
423
424 return eval_node(expression, context)
425
426
427BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = {
428 ast.Add: ("__add__",),
429 ast.Sub: ("__sub__",),
430 ast.Mult: ("__mul__",),
431 ast.Div: ("__truediv__",),
432 ast.FloorDiv: ("__floordiv__",),
433 ast.Mod: ("__mod__",),
434 ast.Pow: ("__pow__",),
435 ast.LShift: ("__lshift__",),
436 ast.RShift: ("__rshift__",),
437 ast.BitOr: ("__or__",),
438 ast.BitXor: ("__xor__",),
439 ast.BitAnd: ("__and__",),
440 ast.MatMult: ("__matmul__",),
441}
442
443COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = {
444 ast.Eq: ("__eq__",),
445 ast.NotEq: ("__ne__", "__eq__"),
446 ast.Lt: ("__lt__", "__gt__"),
447 ast.LtE: ("__le__", "__ge__"),
448 ast.Gt: ("__gt__", "__lt__"),
449 ast.GtE: ("__ge__", "__le__"),
450 ast.In: ("__contains__",),
451 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially
452}
453
454UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = {
455 ast.USub: ("__neg__",),
456 ast.UAdd: ("__pos__",),
457 # we have to check both __inv__ and __invert__!
458 ast.Invert: ("__invert__", "__inv__"),
459 ast.Not: ("__not__",),
460}
461
462
463class ImpersonatingDuck:
464 """A dummy class used to create objects of other classes without calling their ``__init__``"""
465
466 # no-op: override __class__ to impersonate
467
468
469class _Duck:
470 """A dummy class used to create objects pretending to have given attributes"""
471
472 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None):
473 self.attributes = attributes or {}
474 self.items = items or {}
475
476 def __getattr__(self, attr: str):
477 return self.attributes[attr]
478
479 def __hasattr__(self, attr: str):
480 return attr in self.attributes
481
482 def __dir__(self):
483 return [*dir(super), *self.attributes]
484
485 def __getitem__(self, key: str):
486 return self.items[key]
487
488 def __hasitem__(self, key: str):
489 return self.items[key]
490
491 def _ipython_key_completions_(self):
492 return self.items.keys()
493
494
495def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]:
496 dunder = None
497 for op, candidate_dunder in dunders.items():
498 if isinstance(node_op, op):
499 dunder = candidate_dunder
500 return dunder
501
502
503def get_policy(context: EvaluationContext) -> EvaluationPolicy:
504 policy = copy(EVALUATION_POLICIES[context.evaluation])
505
506 for key, value in context.policy_overrides.items():
507 if hasattr(policy, key):
508 setattr(policy, key, value)
509 return policy
510
511
512def _validate_policy_overrides(
513 policy_name: EvaluationPolicyName, policy_overrides: dict
514) -> bool:
515 policy = EVALUATION_POLICIES[policy_name]
516
517 all_good = True
518 for key, value in policy_overrides.items():
519 if not hasattr(policy, key):
520 warnings.warn(
521 f"Override {key!r} is not valid with {policy_name!r} evaluation policy"
522 )
523 all_good = False
524 return all_good
525
526
527def eval_node(node: Union[ast.AST, None], context: EvaluationContext):
528 """Evaluate AST node in provided context.
529
530 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments.
531
532 Does not evaluate actions that always have side effects:
533
534 - class definitions (``class sth: ...``)
535 - function definitions (``def sth: ...``)
536 - variable assignments (``x = 1``)
537 - augmented assignments (``x += 1``)
538 - deletions (``del x``)
539
540 Does not evaluate operations which do not return values:
541
542 - assertions (``assert x``)
543 - pass (``pass``)
544 - imports (``import x``)
545 - control flow:
546
547 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``)
548 - loops (``for`` and ``while``)
549 - exception handling
550
551 The purpose of this function is to guard against unwanted side-effects;
552 it does not give guarantees on protection from malicious code execution.
553 """
554 policy = get_policy(context)
555
556 if node is None:
557 return None
558 if isinstance(node, ast.Expression):
559 return eval_node(node.body, context)
560 if isinstance(node, ast.BinOp):
561 left = eval_node(node.left, context)
562 right = eval_node(node.right, context)
563 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS)
564 if dunders:
565 if policy.can_operate(dunders, left, right):
566 return getattr(left, dunders[0])(right)
567 else:
568 raise GuardRejection(
569 f"Operation (`{dunders}`) for",
570 type(left),
571 f"not allowed in {context.evaluation} mode",
572 )
573 if isinstance(node, ast.Compare):
574 left = eval_node(node.left, context)
575 all_true = True
576 negate = False
577 for op, right in zip(node.ops, node.comparators):
578 right = eval_node(right, context)
579 dunder = None
580 dunders = _find_dunder(op, COMP_OP_DUNDERS)
581 if not dunders:
582 if isinstance(op, ast.NotIn):
583 dunders = COMP_OP_DUNDERS[ast.In]
584 negate = True
585 if isinstance(op, ast.Is):
586 dunder = "is_"
587 if isinstance(op, ast.IsNot):
588 dunder = "is_"
589 negate = True
590 if not dunder and dunders:
591 dunder = dunders[0]
592 if dunder:
593 a, b = (right, left) if dunder == "__contains__" else (left, right)
594 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b):
595 result = getattr(operator, dunder)(a, b)
596 if negate:
597 result = not result
598 if not result:
599 all_true = False
600 left = right
601 else:
602 raise GuardRejection(
603 f"Comparison (`{dunder}`) for",
604 type(left),
605 f"not allowed in {context.evaluation} mode",
606 )
607 else:
608 raise ValueError(
609 f"Comparison `{dunder}` not supported"
610 ) # pragma: no cover
611 return all_true
612 if isinstance(node, ast.Constant):
613 return node.value
614 if isinstance(node, ast.Tuple):
615 return tuple(eval_node(e, context) for e in node.elts)
616 if isinstance(node, ast.List):
617 return [eval_node(e, context) for e in node.elts]
618 if isinstance(node, ast.Set):
619 return {eval_node(e, context) for e in node.elts}
620 if isinstance(node, ast.Dict):
621 return dict(
622 zip(
623 [eval_node(k, context) for k in node.keys],
624 [eval_node(v, context) for v in node.values],
625 )
626 )
627 if isinstance(node, ast.Slice):
628 return slice(
629 eval_node(node.lower, context),
630 eval_node(node.upper, context),
631 eval_node(node.step, context),
632 )
633 if isinstance(node, ast.UnaryOp):
634 value = eval_node(node.operand, context)
635 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS)
636 if dunders:
637 if policy.can_operate(dunders, value):
638 try:
639 return getattr(value, dunders[0])()
640 except AttributeError:
641 raise TypeError(
642 f"bad operand type for unary {node.op}: {type(value)}"
643 )
644 else:
645 raise GuardRejection(
646 f"Operation (`{dunders}`) for",
647 type(value),
648 f"not allowed in {context.evaluation} mode",
649 )
650 if isinstance(node, ast.Subscript):
651 value = eval_node(node.value, context)
652 slice_ = eval_node(node.slice, context)
653 if policy.can_get_item(value, slice_):
654 return value[slice_]
655 raise GuardRejection(
656 "Subscript access (`__getitem__`) for",
657 type(value), # not joined to avoid calling `repr`
658 f" not allowed in {context.evaluation} mode",
659 )
660 if isinstance(node, ast.Name):
661 return _eval_node_name(node.id, context)
662 if isinstance(node, ast.Attribute):
663 value = eval_node(node.value, context)
664 if policy.can_get_attr(value, node.attr):
665 return getattr(value, node.attr)
666 raise GuardRejection(
667 "Attribute access (`__getattr__`) for",
668 type(value), # not joined to avoid calling `repr`
669 f"not allowed in {context.evaluation} mode",
670 )
671 if isinstance(node, ast.IfExp):
672 test = eval_node(node.test, context)
673 if test:
674 return eval_node(node.body, context)
675 else:
676 return eval_node(node.orelse, context)
677 if isinstance(node, ast.Call):
678 func = eval_node(node.func, context)
679 if policy.can_call(func) and not node.keywords:
680 args = [eval_node(arg, context) for arg in node.args]
681 return func(*args)
682 if isclass(func):
683 # this code path gets entered when calling class e.g. `MyClass()`
684 # or `my_instance.__class__()` - in both cases `func` is `MyClass`.
685 # Should return `MyClass` if `__new__` is not overridden,
686 # otherwise whatever `__new__` return type is.
687 overridden_return_type = _eval_return_type(func.__new__, node, context)
688 if overridden_return_type is not NOT_EVALUATED:
689 return overridden_return_type
690 return _create_duck_for_heap_type(func)
691 else:
692 return_type = _eval_return_type(func, node, context)
693 if return_type is not NOT_EVALUATED:
694 return return_type
695 raise GuardRejection(
696 "Call for",
697 func, # not joined to avoid calling `repr`
698 f"not allowed in {context.evaluation} mode",
699 )
700 raise ValueError("Unhandled node", ast.dump(node))
701
702
703def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext):
704 """Evaluate return type of a given callable function.
705
706 Returns the built-in type, a duck or NOT_EVALUATED sentinel.
707 """
708 try:
709 sig = signature(func)
710 except ValueError:
711 sig = UNKNOWN_SIGNATURE
712 # if annotation was not stringized, or it was stringized
713 # but resolved by signature call we know the return type
714 not_empty = sig.return_annotation is not Signature.empty
715 if not_empty:
716 return _resolve_annotation(sig.return_annotation, sig, func, node, context)
717 return NOT_EVALUATED
718
719
720def _resolve_annotation(
721 annotation,
722 sig: Signature,
723 func: Callable,
724 node: ast.Call,
725 context: EvaluationContext,
726):
727 """Resolve annotation created by user with `typing` module and custom objects."""
728 annotation = (
729 _eval_node_name(annotation, context)
730 if isinstance(annotation, str)
731 else annotation
732 )
733 origin = get_origin(annotation)
734 if annotation is Self and hasattr(func, "__self__"):
735 return func.__self__
736 elif origin is Literal:
737 type_args = get_args(annotation)
738 if len(type_args) == 1:
739 return type_args[0]
740 elif annotation is LiteralString:
741 return ""
742 elif annotation is AnyStr:
743 index = None
744 for i, (key, value) in enumerate(sig.parameters.items()):
745 if value.annotation is AnyStr:
746 index = i
747 break
748 if index is not None and index < len(node.args):
749 return eval_node(node.args[index], context)
750 elif origin is TypeGuard:
751 return False
752 elif origin is Union:
753 attributes = [
754 attr
755 for type_arg in get_args(annotation)
756 for attr in dir(_resolve_annotation(type_arg, sig, func, node, context))
757 ]
758 return _Duck(attributes=dict.fromkeys(attributes))
759 elif is_typeddict(annotation):
760 return _Duck(
761 attributes=dict.fromkeys(dir(dict())),
762 items={
763 k: _resolve_annotation(v, sig, func, node, context)
764 for k, v in annotation.__annotations__.items()
765 },
766 )
767 elif hasattr(annotation, "_is_protocol"):
768 return _Duck(attributes=dict.fromkeys(dir(annotation)))
769 elif origin is Annotated:
770 type_arg = get_args(annotation)[0]
771 return _resolve_annotation(type_arg, sig, func, node, context)
772 elif isinstance(annotation, NewType):
773 return _eval_or_create_duck(annotation.__supertype__, node, context)
774 elif isinstance(annotation, TypeAliasType):
775 return _eval_or_create_duck(annotation.__value__, node, context)
776 else:
777 return _eval_or_create_duck(annotation, node, context)
778
779
780def _eval_node_name(node_id: str, context: EvaluationContext):
781 policy = get_policy(context)
782 if policy.allow_locals_access and node_id in context.locals:
783 return context.locals[node_id]
784 if policy.allow_globals_access and node_id in context.globals:
785 return context.globals[node_id]
786 if policy.allow_builtins_access and hasattr(builtins, node_id):
787 # note: do not use __builtins__, it is implementation detail of cPython
788 return getattr(builtins, node_id)
789 if policy.allow_auto_import and context.auto_import:
790 return context.auto_import(node_id)
791 if not policy.allow_globals_access and not policy.allow_locals_access:
792 raise GuardRejection(
793 f"Namespace access not allowed in {context.evaluation} mode"
794 )
795 else:
796 raise NameError(f"{node_id} not found in locals, globals, nor builtins")
797
798
799def _eval_or_create_duck(duck_type, node: ast.Call, context: EvaluationContext):
800 policy = get_policy(context)
801 # if allow-listed builtin is on type annotation, instantiate it
802 if policy.can_call(duck_type) and not node.keywords:
803 args = [eval_node(arg, context) for arg in node.args]
804 return duck_type(*args)
805 # if custom class is in type annotation, mock it
806 return _create_duck_for_heap_type(duck_type)
807
808
809def _create_duck_for_heap_type(duck_type):
810 """Create an imitation of an object of a given type (a duck).
811
812 Returns the duck or NOT_EVALUATED sentinel if duck could not be created.
813 """
814 duck = ImpersonatingDuck()
815 try:
816 # this only works for heap types, not builtins
817 duck.__class__ = duck_type
818 return duck
819 except TypeError:
820 pass
821 return NOT_EVALUATED
822
823
824SUPPORTED_EXTERNAL_GETITEM = {
825 ("pandas", "core", "indexing", "_iLocIndexer"),
826 ("pandas", "core", "indexing", "_LocIndexer"),
827 ("pandas", "DataFrame"),
828 ("pandas", "Series"),
829 ("numpy", "ndarray"),
830 ("numpy", "void"),
831}
832
833
834BUILTIN_GETITEM: set[InstancesHaveGetItem] = {
835 dict,
836 str, # type: ignore[arg-type]
837 bytes, # type: ignore[arg-type]
838 list,
839 tuple,
840 collections.defaultdict,
841 collections.deque,
842 collections.OrderedDict,
843 collections.ChainMap,
844 collections.UserDict,
845 collections.UserList,
846 collections.UserString, # type: ignore[arg-type]
847 _DummyNamedTuple,
848 _IdentitySubscript,
849}
850
851
852def _list_methods(cls, source=None):
853 """For use on immutable objects or with methods returning a copy"""
854 return [getattr(cls, k) for k in (source if source else dir(cls))]
855
856
857dict_non_mutating_methods = ("copy", "keys", "values", "items")
858list_non_mutating_methods = ("copy", "index", "count")
859set_non_mutating_methods = set(dir(set)) & set(dir(frozenset))
860
861
862dict_keys: type[collections.abc.KeysView] = type({}.keys())
863
864NUMERICS = {int, float, complex}
865
866ALLOWED_CALLS = {
867 bytes,
868 *_list_methods(bytes),
869 dict,
870 *_list_methods(dict, dict_non_mutating_methods),
871 dict_keys.isdisjoint,
872 list,
873 *_list_methods(list, list_non_mutating_methods),
874 set,
875 *_list_methods(set, set_non_mutating_methods),
876 frozenset,
877 *_list_methods(frozenset),
878 range,
879 str,
880 *_list_methods(str),
881 tuple,
882 *_list_methods(tuple),
883 *NUMERICS,
884 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)],
885 collections.deque,
886 *_list_methods(collections.deque, list_non_mutating_methods),
887 collections.defaultdict,
888 *_list_methods(collections.defaultdict, dict_non_mutating_methods),
889 collections.OrderedDict,
890 *_list_methods(collections.OrderedDict, dict_non_mutating_methods),
891 collections.UserDict,
892 *_list_methods(collections.UserDict, dict_non_mutating_methods),
893 collections.UserList,
894 *_list_methods(collections.UserList, list_non_mutating_methods),
895 collections.UserString,
896 *_list_methods(collections.UserString, dir(str)),
897 collections.Counter,
898 *_list_methods(collections.Counter, dict_non_mutating_methods),
899 collections.Counter.elements,
900 collections.Counter.most_common,
901}
902
903BUILTIN_GETATTR: set[MayHaveGetattr] = {
904 *BUILTIN_GETITEM,
905 set,
906 frozenset,
907 object,
908 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`.
909 *NUMERICS,
910 dict_keys,
911 MethodDescriptorType,
912 ModuleType,
913}
914
915
916BUILTIN_OPERATIONS = {*BUILTIN_GETATTR}
917
918EVALUATION_POLICIES = {
919 "minimal": EvaluationPolicy(
920 allow_builtins_access=True,
921 allow_locals_access=False,
922 allow_globals_access=False,
923 allow_item_access=False,
924 allow_attr_access=False,
925 allowed_calls=set(),
926 allow_any_calls=False,
927 allow_all_operations=False,
928 ),
929 "limited": SelectivePolicy(
930 allowed_getitem=BUILTIN_GETITEM,
931 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM,
932 allowed_getattr=BUILTIN_GETATTR,
933 allowed_getattr_external={
934 # pandas Series/Frame implements custom `__getattr__`
935 ("pandas", "DataFrame"),
936 ("pandas", "Series"),
937 },
938 allowed_operations=BUILTIN_OPERATIONS,
939 allow_builtins_access=True,
940 allow_locals_access=True,
941 allow_globals_access=True,
942 allowed_calls=ALLOWED_CALLS,
943 ),
944 "unsafe": EvaluationPolicy(
945 allow_builtins_access=True,
946 allow_locals_access=True,
947 allow_globals_access=True,
948 allow_attr_access=True,
949 allow_item_access=True,
950 allow_any_calls=True,
951 allow_all_operations=True,
952 ),
953}
954
955
956__all__ = [
957 "guarded_eval",
958 "eval_node",
959 "GuardRejection",
960 "EvaluationContext",
961 "_unbind_method",
962]