Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/guarded_eval.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

432 statements  

1from copy import copy 

2from inspect import isclass, signature, Signature, getmodule 

3from typing import ( 

4 Annotated, 

5 AnyStr, 

6 Callable, 

7 Literal, 

8 NamedTuple, 

9 NewType, 

10 Optional, 

11 Protocol, 

12 Sequence, 

13 TypeGuard, 

14 Union, 

15 get_args, 

16 get_origin, 

17 is_typeddict, 

18) 

19import ast 

20import builtins 

21import collections 

22import operator 

23import sys 

24import warnings 

25from functools import cached_property 

26from dataclasses import dataclass, field 

27from types import MethodDescriptorType, ModuleType 

28 

29from IPython.utils.decorators import undoc 

30 

31 

32from typing import Self, LiteralString 

33 

34if sys.version_info < (3, 12): 

35 from typing_extensions import TypeAliasType 

36else: 

37 from typing import TypeAliasType 

38 

39 

40@undoc 

41class HasGetItem(Protocol): 

42 def __getitem__(self, key) -> None: ... 

43 

44 

45@undoc 

46class InstancesHaveGetItem(Protocol): 

47 def __call__(self, *args, **kwargs) -> HasGetItem: ... 

48 

49 

50@undoc 

51class HasGetAttr(Protocol): 

52 def __getattr__(self, key) -> None: ... 

53 

54 

55@undoc 

56class DoesNotHaveGetAttr(Protocol): 

57 pass 

58 

59 

60# By default `__getattr__` is not explicitly implemented on most objects 

61MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] 

62 

63 

64def _unbind_method(func: Callable) -> Union[Callable, None]: 

65 """Get unbound method for given bound method. 

66 

67 Returns None if cannot get unbound method, or method is already unbound. 

68 """ 

69 owner = getattr(func, "__self__", None) 

70 owner_class = type(owner) 

71 name = getattr(func, "__name__", None) 

72 instance_dict_overrides = getattr(owner, "__dict__", None) 

73 if ( 

74 owner is not None 

75 and name 

76 and ( 

77 not instance_dict_overrides 

78 or (instance_dict_overrides and name not in instance_dict_overrides) 

79 ) 

80 ): 

81 return getattr(owner_class, name) 

82 return None 

83 

84 

85@undoc 

86@dataclass 

87class EvaluationPolicy: 

88 """Definition of evaluation policy.""" 

89 

90 allow_locals_access: bool = False 

91 allow_globals_access: bool = False 

92 allow_item_access: bool = False 

93 allow_attr_access: bool = False 

94 allow_builtins_access: bool = False 

95 allow_all_operations: bool = False 

96 allow_any_calls: bool = False 

97 allow_auto_import: bool = False 

98 allowed_calls: set[Callable] = field(default_factory=set) 

99 

100 def can_get_item(self, value, item): 

101 return self.allow_item_access 

102 

103 def can_get_attr(self, value, attr): 

104 return self.allow_attr_access 

105 

106 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

107 if self.allow_all_operations: 

108 return True 

109 

110 def can_call(self, func): 

111 if self.allow_any_calls: 

112 return True 

113 

114 if func in self.allowed_calls: 

115 return True 

116 

117 owner_method = _unbind_method(func) 

118 

119 if owner_method and owner_method in self.allowed_calls: 

120 return True 

121 

122 

123def _get_external(module_name: str, access_path: Sequence[str]): 

124 """Get value from external module given a dotted access path. 

125 

126 Only gets value if the module is already imported. 

127 

128 Raises: 

129 * `KeyError` if module is removed not found, and 

130 * `AttributeError` if access path does not match an exported object 

131 """ 

132 try: 

133 member_type = sys.modules[module_name] 

134 # standard module 

135 for attr in access_path: 

136 member_type = getattr(member_type, attr) 

137 return member_type 

138 except (KeyError, AttributeError): 

139 # handle modules in namespace packages 

140 module_path = ".".join([module_name, *access_path]) 

141 if module_path in sys.modules: 

142 return sys.modules[module_path] 

143 raise 

144 

145 

146def _has_original_dunder_external( 

147 value, 

148 module_name: str, 

149 access_path: Sequence[str], 

150 method_name: str, 

151): 

152 if module_name not in sys.modules: 

153 full_module_path = ".".join([module_name, *access_path]) 

154 if full_module_path not in sys.modules: 

155 # LBYLB as it is faster 

156 return False 

157 try: 

158 member_type = _get_external(module_name, access_path) 

159 value_type = type(value) 

160 if type(value) == member_type: 

161 return True 

162 if isinstance(member_type, ModuleType): 

163 value_module = getmodule(value_type) 

164 if not value_module or not value_module.__name__: 

165 return False 

166 if value_module.__name__.startswith(member_type.__name__): 

167 return True 

168 if method_name == "__getattribute__": 

169 # we have to short-circuit here due to an unresolved issue in 

170 # `isinstance` implementation: https://bugs.python.org/issue32683 

171 return False 

172 if not isinstance(member_type, ModuleType) and isinstance(value, member_type): 

173 method = getattr(value_type, method_name, None) 

174 member_method = getattr(member_type, method_name, None) 

175 if member_method == method: 

176 return True 

177 except (AttributeError, KeyError): 

178 return False 

179 

180 

181def _has_original_dunder( 

182 value, allowed_types, allowed_methods, allowed_external, method_name 

183): 

184 # note: Python ignores `__getattr__`/`__getitem__` on instances, 

185 # we only need to check at class level 

186 value_type = type(value) 

187 

188 # strict type check passes → no need to check method 

189 if value_type in allowed_types: 

190 return True 

191 

192 method = getattr(value_type, method_name, None) 

193 

194 if method is None: 

195 return None 

196 

197 if method in allowed_methods: 

198 return True 

199 

200 for module_name, *access_path in allowed_external: 

201 if _has_original_dunder_external(value, module_name, access_path, method_name): 

202 return True 

203 

204 return False 

205 

206 

207def _coerce_path_to_tuples( 

208 allow_list: set[tuple[str, ...] | str], 

209) -> set[tuple[str, ...]]: 

210 """Replace dotted paths on the provided allow-list with tuples.""" 

211 return { 

212 path if isinstance(path, tuple) else tuple(path.split(".")) 

213 for path in allow_list 

214 } 

215 

216 

217@undoc 

218@dataclass 

219class SelectivePolicy(EvaluationPolicy): 

220 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set) 

221 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set) 

222 

223 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set) 

224 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set) 

225 

226 allowed_operations: set = field(default_factory=set) 

227 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set) 

228 

229 _operation_methods_cache: dict[str, set[Callable]] = field( 

230 default_factory=dict, init=False 

231 ) 

232 

233 def can_get_attr(self, value, attr): 

234 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external) 

235 

236 has_original_attribute = _has_original_dunder( 

237 value, 

238 allowed_types=self.allowed_getattr, 

239 allowed_methods=self._getattribute_methods, 

240 allowed_external=allowed_getattr_external, 

241 method_name="__getattribute__", 

242 ) 

243 has_original_attr = _has_original_dunder( 

244 value, 

245 allowed_types=self.allowed_getattr, 

246 allowed_methods=self._getattr_methods, 

247 allowed_external=allowed_getattr_external, 

248 method_name="__getattr__", 

249 ) 

250 

251 accept = False 

252 

253 # Many objects do not have `__getattr__`, this is fine. 

254 if has_original_attr is None and has_original_attribute: 

255 accept = True 

256 else: 

257 # Accept objects without modifications to `__getattr__` and `__getattribute__` 

258 accept = has_original_attr and has_original_attribute 

259 

260 if accept: 

261 # We still need to check for overridden properties. 

262 

263 value_class = type(value) 

264 if not hasattr(value_class, attr): 

265 return True 

266 

267 class_attr_val = getattr(value_class, attr) 

268 is_property = isinstance(class_attr_val, property) 

269 

270 if not is_property: 

271 return True 

272 

273 # Properties in allowed types are ok (although we do not include any 

274 # properties in our default allow list currently). 

275 if type(value) in self.allowed_getattr: 

276 return True # pragma: no cover 

277 

278 # Properties in subclasses of allowed types may be ok if not changed 

279 for module_name, *access_path in allowed_getattr_external: 

280 try: 

281 external_class = _get_external(module_name, access_path) 

282 external_class_attr_val = getattr(external_class, attr) 

283 except (KeyError, AttributeError): 

284 return False # pragma: no cover 

285 return class_attr_val == external_class_attr_val 

286 

287 return False 

288 

289 def can_get_item(self, value, item): 

290 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" 

291 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external) 

292 return _has_original_dunder( 

293 value, 

294 allowed_types=self.allowed_getitem, 

295 allowed_methods=self._getitem_methods, 

296 allowed_external=allowed_getitem_external, 

297 method_name="__getitem__", 

298 ) 

299 

300 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

301 allowed_operations_external = _coerce_path_to_tuples( 

302 self.allowed_operations_external 

303 ) 

304 objects = [a] 

305 if b is not None: 

306 objects.append(b) 

307 return all( 

308 [ 

309 _has_original_dunder( 

310 obj, 

311 allowed_types=self.allowed_operations, 

312 allowed_methods=self._operator_dunder_methods(dunder), 

313 allowed_external=allowed_operations_external, 

314 method_name=dunder, 

315 ) 

316 for dunder in dunders 

317 for obj in objects 

318 ] 

319 ) 

320 

321 def _operator_dunder_methods(self, dunder: str) -> set[Callable]: 

322 if dunder not in self._operation_methods_cache: 

323 self._operation_methods_cache[dunder] = self._safe_get_methods( 

324 self.allowed_operations, dunder 

325 ) 

326 return self._operation_methods_cache[dunder] 

327 

328 @cached_property 

329 def _getitem_methods(self) -> set[Callable]: 

330 return self._safe_get_methods(self.allowed_getitem, "__getitem__") 

331 

332 @cached_property 

333 def _getattr_methods(self) -> set[Callable]: 

334 return self._safe_get_methods(self.allowed_getattr, "__getattr__") 

335 

336 @cached_property 

337 def _getattribute_methods(self) -> set[Callable]: 

338 return self._safe_get_methods(self.allowed_getattr, "__getattribute__") 

339 

340 def _safe_get_methods(self, classes, name) -> set[Callable]: 

341 return { 

342 method 

343 for class_ in classes 

344 for method in [getattr(class_, name, None)] 

345 if method 

346 } 

347 

348 

349class _DummyNamedTuple(NamedTuple): 

350 """Used internally to retrieve methods of named tuple instance.""" 

351 

352 

353EvaluationPolicyName = Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] 

354 

355 

356class EvaluationContext(NamedTuple): 

357 #: Local namespace 

358 locals: dict 

359 #: Global namespace 

360 globals: dict 

361 #: Evaluation policy identifier 

362 evaluation: EvaluationPolicyName = "forbidden" 

363 #: Whether the evaluation of code takes place inside of a subscript. 

364 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. 

365 in_subscript: bool = False 

366 #: Auto import method 

367 auto_import: Callable[list[str], ModuleType] | None = None 

368 #: Overrides for evaluation policy 

369 policy_overrides: dict = {} 

370 

371 

372class _IdentitySubscript: 

373 """Returns the key itself when item is requested via subscript.""" 

374 

375 def __getitem__(self, key): 

376 return key 

377 

378 

379IDENTITY_SUBSCRIPT = _IdentitySubscript() 

380SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" 

381UNKNOWN_SIGNATURE = Signature() 

382NOT_EVALUATED = object() 

383 

384 

385class GuardRejection(Exception): 

386 """Exception raised when guard rejects evaluation attempt.""" 

387 

388 pass 

389 

390 

391def guarded_eval(code: str, context: EvaluationContext): 

392 """Evaluate provided code in the evaluation context. 

393 

394 If evaluation policy given by context is set to ``forbidden`` 

395 no evaluation will be performed; if it is set to ``dangerous`` 

396 standard :func:`eval` will be used; finally, for any other, 

397 policy :func:`eval_node` will be called on parsed AST. 

398 """ 

399 locals_ = context.locals 

400 

401 if context.evaluation == "forbidden": 

402 raise GuardRejection("Forbidden mode") 

403 

404 # note: not using `ast.literal_eval` as it does not implement 

405 # getitem at all, for example it fails on simple `[0][1]` 

406 

407 if context.in_subscript: 

408 # syntactic sugar for ellipsis (:) is only available in subscripts 

409 # so we need to trick the ast parser into thinking that we have 

410 # a subscript, but we need to be able to later recognise that we did 

411 # it so we can ignore the actual __getitem__ operation 

412 if not code: 

413 return tuple() 

414 locals_ = locals_.copy() 

415 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT 

416 code = SUBSCRIPT_MARKER + "[" + code + "]" 

417 context = EvaluationContext(**{**context._asdict(), **{"locals": locals_}}) 

418 

419 if context.evaluation == "dangerous": 

420 return eval(code, context.globals, context.locals) 

421 

422 expression = ast.parse(code, mode="eval") 

423 

424 return eval_node(expression, context) 

425 

426 

427BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = { 

428 ast.Add: ("__add__",), 

429 ast.Sub: ("__sub__",), 

430 ast.Mult: ("__mul__",), 

431 ast.Div: ("__truediv__",), 

432 ast.FloorDiv: ("__floordiv__",), 

433 ast.Mod: ("__mod__",), 

434 ast.Pow: ("__pow__",), 

435 ast.LShift: ("__lshift__",), 

436 ast.RShift: ("__rshift__",), 

437 ast.BitOr: ("__or__",), 

438 ast.BitXor: ("__xor__",), 

439 ast.BitAnd: ("__and__",), 

440 ast.MatMult: ("__matmul__",), 

441} 

442 

443COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = { 

444 ast.Eq: ("__eq__",), 

445 ast.NotEq: ("__ne__", "__eq__"), 

446 ast.Lt: ("__lt__", "__gt__"), 

447 ast.LtE: ("__le__", "__ge__"), 

448 ast.Gt: ("__gt__", "__lt__"), 

449 ast.GtE: ("__ge__", "__le__"), 

450 ast.In: ("__contains__",), 

451 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially 

452} 

453 

454UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = { 

455 ast.USub: ("__neg__",), 

456 ast.UAdd: ("__pos__",), 

457 # we have to check both __inv__ and __invert__! 

458 ast.Invert: ("__invert__", "__inv__"), 

459 ast.Not: ("__not__",), 

460} 

461 

462 

463class ImpersonatingDuck: 

464 """A dummy class used to create objects of other classes without calling their ``__init__``""" 

465 

466 # no-op: override __class__ to impersonate 

467 

468 

469class _Duck: 

470 """A dummy class used to create objects pretending to have given attributes""" 

471 

472 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): 

473 self.attributes = attributes or {} 

474 self.items = items or {} 

475 

476 def __getattr__(self, attr: str): 

477 return self.attributes[attr] 

478 

479 def __hasattr__(self, attr: str): 

480 return attr in self.attributes 

481 

482 def __dir__(self): 

483 return [*dir(super), *self.attributes] 

484 

485 def __getitem__(self, key: str): 

486 return self.items[key] 

487 

488 def __hasitem__(self, key: str): 

489 return self.items[key] 

490 

491 def _ipython_key_completions_(self): 

492 return self.items.keys() 

493 

494 

495def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]: 

496 dunder = None 

497 for op, candidate_dunder in dunders.items(): 

498 if isinstance(node_op, op): 

499 dunder = candidate_dunder 

500 return dunder 

501 

502 

503def get_policy(context: EvaluationContext) -> EvaluationPolicy: 

504 policy = copy(EVALUATION_POLICIES[context.evaluation]) 

505 

506 for key, value in context.policy_overrides.items(): 

507 if hasattr(policy, key): 

508 setattr(policy, key, value) 

509 return policy 

510 

511 

512def _validate_policy_overrides( 

513 policy_name: EvaluationPolicyName, policy_overrides: dict 

514) -> bool: 

515 policy = EVALUATION_POLICIES[policy_name] 

516 

517 all_good = True 

518 for key, value in policy_overrides.items(): 

519 if not hasattr(policy, key): 

520 warnings.warn( 

521 f"Override {key!r} is not valid with {policy_name!r} evaluation policy" 

522 ) 

523 all_good = False 

524 return all_good 

525 

526 

527def eval_node(node: Union[ast.AST, None], context: EvaluationContext): 

528 """Evaluate AST node in provided context. 

529 

530 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. 

531 

532 Does not evaluate actions that always have side effects: 

533 

534 - class definitions (``class sth: ...``) 

535 - function definitions (``def sth: ...``) 

536 - variable assignments (``x = 1``) 

537 - augmented assignments (``x += 1``) 

538 - deletions (``del x``) 

539 

540 Does not evaluate operations which do not return values: 

541 

542 - assertions (``assert x``) 

543 - pass (``pass``) 

544 - imports (``import x``) 

545 - control flow: 

546 

547 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``) 

548 - loops (``for`` and ``while``) 

549 - exception handling 

550 

551 The purpose of this function is to guard against unwanted side-effects; 

552 it does not give guarantees on protection from malicious code execution. 

553 """ 

554 policy = get_policy(context) 

555 

556 if node is None: 

557 return None 

558 if isinstance(node, ast.Expression): 

559 return eval_node(node.body, context) 

560 if isinstance(node, ast.BinOp): 

561 left = eval_node(node.left, context) 

562 right = eval_node(node.right, context) 

563 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) 

564 if dunders: 

565 if policy.can_operate(dunders, left, right): 

566 return getattr(left, dunders[0])(right) 

567 else: 

568 raise GuardRejection( 

569 f"Operation (`{dunders}`) for", 

570 type(left), 

571 f"not allowed in {context.evaluation} mode", 

572 ) 

573 if isinstance(node, ast.Compare): 

574 left = eval_node(node.left, context) 

575 all_true = True 

576 negate = False 

577 for op, right in zip(node.ops, node.comparators): 

578 right = eval_node(right, context) 

579 dunder = None 

580 dunders = _find_dunder(op, COMP_OP_DUNDERS) 

581 if not dunders: 

582 if isinstance(op, ast.NotIn): 

583 dunders = COMP_OP_DUNDERS[ast.In] 

584 negate = True 

585 if isinstance(op, ast.Is): 

586 dunder = "is_" 

587 if isinstance(op, ast.IsNot): 

588 dunder = "is_" 

589 negate = True 

590 if not dunder and dunders: 

591 dunder = dunders[0] 

592 if dunder: 

593 a, b = (right, left) if dunder == "__contains__" else (left, right) 

594 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): 

595 result = getattr(operator, dunder)(a, b) 

596 if negate: 

597 result = not result 

598 if not result: 

599 all_true = False 

600 left = right 

601 else: 

602 raise GuardRejection( 

603 f"Comparison (`{dunder}`) for", 

604 type(left), 

605 f"not allowed in {context.evaluation} mode", 

606 ) 

607 else: 

608 raise ValueError( 

609 f"Comparison `{dunder}` not supported" 

610 ) # pragma: no cover 

611 return all_true 

612 if isinstance(node, ast.Constant): 

613 return node.value 

614 if isinstance(node, ast.Tuple): 

615 return tuple(eval_node(e, context) for e in node.elts) 

616 if isinstance(node, ast.List): 

617 return [eval_node(e, context) for e in node.elts] 

618 if isinstance(node, ast.Set): 

619 return {eval_node(e, context) for e in node.elts} 

620 if isinstance(node, ast.Dict): 

621 return dict( 

622 zip( 

623 [eval_node(k, context) for k in node.keys], 

624 [eval_node(v, context) for v in node.values], 

625 ) 

626 ) 

627 if isinstance(node, ast.Slice): 

628 return slice( 

629 eval_node(node.lower, context), 

630 eval_node(node.upper, context), 

631 eval_node(node.step, context), 

632 ) 

633 if isinstance(node, ast.UnaryOp): 

634 value = eval_node(node.operand, context) 

635 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) 

636 if dunders: 

637 if policy.can_operate(dunders, value): 

638 try: 

639 return getattr(value, dunders[0])() 

640 except AttributeError: 

641 raise TypeError( 

642 f"bad operand type for unary {node.op}: {type(value)}" 

643 ) 

644 else: 

645 raise GuardRejection( 

646 f"Operation (`{dunders}`) for", 

647 type(value), 

648 f"not allowed in {context.evaluation} mode", 

649 ) 

650 if isinstance(node, ast.Subscript): 

651 value = eval_node(node.value, context) 

652 slice_ = eval_node(node.slice, context) 

653 if policy.can_get_item(value, slice_): 

654 return value[slice_] 

655 raise GuardRejection( 

656 "Subscript access (`__getitem__`) for", 

657 type(value), # not joined to avoid calling `repr` 

658 f" not allowed in {context.evaluation} mode", 

659 ) 

660 if isinstance(node, ast.Name): 

661 return _eval_node_name(node.id, context) 

662 if isinstance(node, ast.Attribute): 

663 value = eval_node(node.value, context) 

664 if policy.can_get_attr(value, node.attr): 

665 return getattr(value, node.attr) 

666 raise GuardRejection( 

667 "Attribute access (`__getattr__`) for", 

668 type(value), # not joined to avoid calling `repr` 

669 f"not allowed in {context.evaluation} mode", 

670 ) 

671 if isinstance(node, ast.IfExp): 

672 test = eval_node(node.test, context) 

673 if test: 

674 return eval_node(node.body, context) 

675 else: 

676 return eval_node(node.orelse, context) 

677 if isinstance(node, ast.Call): 

678 func = eval_node(node.func, context) 

679 if policy.can_call(func) and not node.keywords: 

680 args = [eval_node(arg, context) for arg in node.args] 

681 return func(*args) 

682 if isclass(func): 

683 # this code path gets entered when calling class e.g. `MyClass()` 

684 # or `my_instance.__class__()` - in both cases `func` is `MyClass`. 

685 # Should return `MyClass` if `__new__` is not overridden, 

686 # otherwise whatever `__new__` return type is. 

687 overridden_return_type = _eval_return_type(func.__new__, node, context) 

688 if overridden_return_type is not NOT_EVALUATED: 

689 return overridden_return_type 

690 return _create_duck_for_heap_type(func) 

691 else: 

692 return_type = _eval_return_type(func, node, context) 

693 if return_type is not NOT_EVALUATED: 

694 return return_type 

695 raise GuardRejection( 

696 "Call for", 

697 func, # not joined to avoid calling `repr` 

698 f"not allowed in {context.evaluation} mode", 

699 ) 

700 raise ValueError("Unhandled node", ast.dump(node)) 

701 

702 

703def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): 

704 """Evaluate return type of a given callable function. 

705 

706 Returns the built-in type, a duck or NOT_EVALUATED sentinel. 

707 """ 

708 try: 

709 sig = signature(func) 

710 except ValueError: 

711 sig = UNKNOWN_SIGNATURE 

712 # if annotation was not stringized, or it was stringized 

713 # but resolved by signature call we know the return type 

714 not_empty = sig.return_annotation is not Signature.empty 

715 if not_empty: 

716 return _resolve_annotation(sig.return_annotation, sig, func, node, context) 

717 return NOT_EVALUATED 

718 

719 

720def _resolve_annotation( 

721 annotation, 

722 sig: Signature, 

723 func: Callable, 

724 node: ast.Call, 

725 context: EvaluationContext, 

726): 

727 """Resolve annotation created by user with `typing` module and custom objects.""" 

728 annotation = ( 

729 _eval_node_name(annotation, context) 

730 if isinstance(annotation, str) 

731 else annotation 

732 ) 

733 origin = get_origin(annotation) 

734 if annotation is Self and hasattr(func, "__self__"): 

735 return func.__self__ 

736 elif origin is Literal: 

737 type_args = get_args(annotation) 

738 if len(type_args) == 1: 

739 return type_args[0] 

740 elif annotation is LiteralString: 

741 return "" 

742 elif annotation is AnyStr: 

743 index = None 

744 for i, (key, value) in enumerate(sig.parameters.items()): 

745 if value.annotation is AnyStr: 

746 index = i 

747 break 

748 if index is not None and index < len(node.args): 

749 return eval_node(node.args[index], context) 

750 elif origin is TypeGuard: 

751 return False 

752 elif origin is Union: 

753 attributes = [ 

754 attr 

755 for type_arg in get_args(annotation) 

756 for attr in dir(_resolve_annotation(type_arg, sig, func, node, context)) 

757 ] 

758 return _Duck(attributes=dict.fromkeys(attributes)) 

759 elif is_typeddict(annotation): 

760 return _Duck( 

761 attributes=dict.fromkeys(dir(dict())), 

762 items={ 

763 k: _resolve_annotation(v, sig, func, node, context) 

764 for k, v in annotation.__annotations__.items() 

765 }, 

766 ) 

767 elif hasattr(annotation, "_is_protocol"): 

768 return _Duck(attributes=dict.fromkeys(dir(annotation))) 

769 elif origin is Annotated: 

770 type_arg = get_args(annotation)[0] 

771 return _resolve_annotation(type_arg, sig, func, node, context) 

772 elif isinstance(annotation, NewType): 

773 return _eval_or_create_duck(annotation.__supertype__, node, context) 

774 elif isinstance(annotation, TypeAliasType): 

775 return _eval_or_create_duck(annotation.__value__, node, context) 

776 else: 

777 return _eval_or_create_duck(annotation, node, context) 

778 

779 

780def _eval_node_name(node_id: str, context: EvaluationContext): 

781 policy = get_policy(context) 

782 if policy.allow_locals_access and node_id in context.locals: 

783 return context.locals[node_id] 

784 if policy.allow_globals_access and node_id in context.globals: 

785 return context.globals[node_id] 

786 if policy.allow_builtins_access and hasattr(builtins, node_id): 

787 # note: do not use __builtins__, it is implementation detail of cPython 

788 return getattr(builtins, node_id) 

789 if policy.allow_auto_import and context.auto_import: 

790 return context.auto_import(node_id) 

791 if not policy.allow_globals_access and not policy.allow_locals_access: 

792 raise GuardRejection( 

793 f"Namespace access not allowed in {context.evaluation} mode" 

794 ) 

795 else: 

796 raise NameError(f"{node_id} not found in locals, globals, nor builtins") 

797 

798 

799def _eval_or_create_duck(duck_type, node: ast.Call, context: EvaluationContext): 

800 policy = get_policy(context) 

801 # if allow-listed builtin is on type annotation, instantiate it 

802 if policy.can_call(duck_type) and not node.keywords: 

803 args = [eval_node(arg, context) for arg in node.args] 

804 return duck_type(*args) 

805 # if custom class is in type annotation, mock it 

806 return _create_duck_for_heap_type(duck_type) 

807 

808 

809def _create_duck_for_heap_type(duck_type): 

810 """Create an imitation of an object of a given type (a duck). 

811 

812 Returns the duck or NOT_EVALUATED sentinel if duck could not be created. 

813 """ 

814 duck = ImpersonatingDuck() 

815 try: 

816 # this only works for heap types, not builtins 

817 duck.__class__ = duck_type 

818 return duck 

819 except TypeError: 

820 pass 

821 return NOT_EVALUATED 

822 

823 

824SUPPORTED_EXTERNAL_GETITEM = { 

825 ("pandas", "core", "indexing", "_iLocIndexer"), 

826 ("pandas", "core", "indexing", "_LocIndexer"), 

827 ("pandas", "DataFrame"), 

828 ("pandas", "Series"), 

829 ("numpy", "ndarray"), 

830 ("numpy", "void"), 

831} 

832 

833 

834BUILTIN_GETITEM: set[InstancesHaveGetItem] = { 

835 dict, 

836 str, # type: ignore[arg-type] 

837 bytes, # type: ignore[arg-type] 

838 list, 

839 tuple, 

840 collections.defaultdict, 

841 collections.deque, 

842 collections.OrderedDict, 

843 collections.ChainMap, 

844 collections.UserDict, 

845 collections.UserList, 

846 collections.UserString, # type: ignore[arg-type] 

847 _DummyNamedTuple, 

848 _IdentitySubscript, 

849} 

850 

851 

852def _list_methods(cls, source=None): 

853 """For use on immutable objects or with methods returning a copy""" 

854 return [getattr(cls, k) for k in (source if source else dir(cls))] 

855 

856 

857dict_non_mutating_methods = ("copy", "keys", "values", "items") 

858list_non_mutating_methods = ("copy", "index", "count") 

859set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) 

860 

861 

862dict_keys: type[collections.abc.KeysView] = type({}.keys()) 

863 

864NUMERICS = {int, float, complex} 

865 

866ALLOWED_CALLS = { 

867 bytes, 

868 *_list_methods(bytes), 

869 dict, 

870 *_list_methods(dict, dict_non_mutating_methods), 

871 dict_keys.isdisjoint, 

872 list, 

873 *_list_methods(list, list_non_mutating_methods), 

874 set, 

875 *_list_methods(set, set_non_mutating_methods), 

876 frozenset, 

877 *_list_methods(frozenset), 

878 range, 

879 str, 

880 *_list_methods(str), 

881 tuple, 

882 *_list_methods(tuple), 

883 *NUMERICS, 

884 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], 

885 collections.deque, 

886 *_list_methods(collections.deque, list_non_mutating_methods), 

887 collections.defaultdict, 

888 *_list_methods(collections.defaultdict, dict_non_mutating_methods), 

889 collections.OrderedDict, 

890 *_list_methods(collections.OrderedDict, dict_non_mutating_methods), 

891 collections.UserDict, 

892 *_list_methods(collections.UserDict, dict_non_mutating_methods), 

893 collections.UserList, 

894 *_list_methods(collections.UserList, list_non_mutating_methods), 

895 collections.UserString, 

896 *_list_methods(collections.UserString, dir(str)), 

897 collections.Counter, 

898 *_list_methods(collections.Counter, dict_non_mutating_methods), 

899 collections.Counter.elements, 

900 collections.Counter.most_common, 

901} 

902 

903BUILTIN_GETATTR: set[MayHaveGetattr] = { 

904 *BUILTIN_GETITEM, 

905 set, 

906 frozenset, 

907 object, 

908 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. 

909 *NUMERICS, 

910 dict_keys, 

911 MethodDescriptorType, 

912 ModuleType, 

913} 

914 

915 

916BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} 

917 

918EVALUATION_POLICIES = { 

919 "minimal": EvaluationPolicy( 

920 allow_builtins_access=True, 

921 allow_locals_access=False, 

922 allow_globals_access=False, 

923 allow_item_access=False, 

924 allow_attr_access=False, 

925 allowed_calls=set(), 

926 allow_any_calls=False, 

927 allow_all_operations=False, 

928 ), 

929 "limited": SelectivePolicy( 

930 allowed_getitem=BUILTIN_GETITEM, 

931 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, 

932 allowed_getattr=BUILTIN_GETATTR, 

933 allowed_getattr_external={ 

934 # pandas Series/Frame implements custom `__getattr__` 

935 ("pandas", "DataFrame"), 

936 ("pandas", "Series"), 

937 }, 

938 allowed_operations=BUILTIN_OPERATIONS, 

939 allow_builtins_access=True, 

940 allow_locals_access=True, 

941 allow_globals_access=True, 

942 allowed_calls=ALLOWED_CALLS, 

943 ), 

944 "unsafe": EvaluationPolicy( 

945 allow_builtins_access=True, 

946 allow_locals_access=True, 

947 allow_globals_access=True, 

948 allow_attr_access=True, 

949 allow_item_access=True, 

950 allow_any_calls=True, 

951 allow_all_operations=True, 

952 ), 

953} 

954 

955 

956__all__ = [ 

957 "guarded_eval", 

958 "eval_node", 

959 "GuardRejection", 

960 "EvaluationContext", 

961 "_unbind_method", 

962]